agent0.chainsync.exec.analyze_data ================================== .. py:module:: agent0.chainsync.exec.analyze_data .. autoapi-nested-parse:: Script to format on-chain hyperdrive pool, config, and transaction data post-processing. .. !! processed by numpydoc !! Functions --------- .. autoapisummary:: agent0.chainsync.exec.analyze_data.analyze_data agent0.chainsync.exec.analyze_data.get_latest_data_block Module Contents --------------- .. py:function:: analyze_data(start_block: int = 0, interfaces: list[agent0.ethpy.hyperdrive.HyperdriveReadInterface] | None = None, rpc_uri: str | None = None, hyperdrive_addresses: list[eth_typing.ChecksumAddress] | dict[str, eth_typing.ChecksumAddress] | None = None, db_session: sqlalchemy.orm.Session | None = None, postgres_config: agent0.chainsync.PostgresConfig | None = None, calc_pnl: bool = True, backfill: bool = False, backfill_sample_period: int | None = None, backfill_progress_bar: bool = False) Execute the data acquisition pipeline. :param start_block: The starting block to run analysis on for backfilling. :type start_block: int, optional :param interfaces: A collection of Hyperdrive interface objects, each connected to a pool. If not set, will initialize one based on rpc_uri and hyperdrive_address. :type interfaces: list[HyperdriveReadInterface] | None, optional :param rpc_uri: The URI for the web3 provider to initialize the interface with. Not used if an interface is provided. :type rpc_uri: str, optional :param hyperdrive_addresses: A collection of Hyperdrive address, each pointing to an initialized pool. Can also be the output of `get_hyperdrive_addresses_from_registry`, which is a dictionary keyed by a logical name and a value of a hyperdrive address. Not used if a list of interfaces is provided. :type hyperdrive_addresses: list[ChecksumAddress] | dict[str, ChecksumAddress] | None, optional :param db_session: Session object for connecting to db. If None, will initialize a new session based on .env. :type db_session: Session | None :param postgres_config: PostgresConfig for connecting to db. If none, will set from .env. :type postgres_config: PostgresConfig | None = None, :param calc_pnl: Whether to calculate pnl. Defaults to True. :type calc_pnl: bool :param backfill: If true, will fill in missing pool info data for every `backfill_sample_period` blocks. Defaults to True. :type backfill: bool, optional :param backfill_sample_period: The sample frequency when backfilling. If None, will backfill every block. :type backfill_sample_period: int | None, optional :param backfill_progress_bar: If true, will show a progress bar when backfilling. Defaults to False. :type backfill_progress_bar: bool, optional .. !! processed by numpydoc !! .. py:function:: get_latest_data_block(db_session: sqlalchemy.orm.Session) -> int Gets the latest block the data pipeline has written Since there are multiple tables that analysis reads from, we query the latest block from all read tables and select the minimum block from the list. :param db_session: The initialized db session. :type db_session: Session :returns: The latest block number from the PoolInfo table. :rtype: int .. !! processed by numpydoc !!