agent0.utils

General utility functions

Submodules

Functions

async_runner(→ list[R])

Helper function that runs a list of passed in functions asynchronously.

block_number_before_timestamp(→ eth_typing.BlockNumber)

Finds the closest block number that is before or at the given block time.

Package Contents

async agent0.utils.async_runner(funcs: Sequence[Callable[[], R]], return_exceptions: bool = False) list[R]

Helper function that runs a list of passed in functions asynchronously.

NOTE: use functools.partial() to pass in arguments to the functions.

WARNING: this assumes all functions passed in are thread safe, use at your own risk.

Parameters:
  • funcs (list[Callable[[], R]]) – List of functions to run asynchronously.

  • return_exceptions (bool) – If True, return exceptions from the functions. Otherwise, will throw exception if a thread fails.

Returns:

List of results.

Return type:

list[R]

agent0.utils.block_number_before_timestamp(web3: web3.Web3, block_timestamp: web3.types.Timestamp | int) eth_typing.BlockNumber

Finds the closest block number that is before or at the given block time.

Parameters:
  • web3 (Web3) – The web3 instance.

  • block_timestamp (BlockTime | int) – The block time to find the closest block to.

Returns:

The closest block number to the given block time.

Return type:

BlockNumber