agent0.chainsync.db.hyperdrive.interface ======================================== .. py:module:: agent0.chainsync.db.hyperdrive.interface .. autoapi-nested-parse:: Utilities for hyperdrive related postgres interactions. .. !! processed by numpydoc !! Functions --------- .. autoapisummary:: agent0.chainsync.db.hyperdrive.interface.add_hyperdrive_addr_to_name agent0.chainsync.db.hyperdrive.interface.get_hyperdrive_addr_to_name agent0.chainsync.db.hyperdrive.interface.add_trade_events agent0.chainsync.db.hyperdrive.interface.get_latest_block_number_from_trade_event agent0.chainsync.db.hyperdrive.interface.get_latest_block_number_from_positions_snapshot_table agent0.chainsync.db.hyperdrive.interface.get_trade_events agent0.chainsync.db.hyperdrive.interface.get_current_positions agent0.chainsync.db.hyperdrive.interface.get_pool_config agent0.chainsync.db.hyperdrive.interface.add_pool_config agent0.chainsync.db.hyperdrive.interface.add_pool_infos agent0.chainsync.db.hyperdrive.interface.add_checkpoint_info agent0.chainsync.db.hyperdrive.interface.get_latest_block_number_from_pool_info_table agent0.chainsync.db.hyperdrive.interface.get_pool_info agent0.chainsync.db.hyperdrive.interface.get_latest_block_number_from_checkpoint_info_table agent0.chainsync.db.hyperdrive.interface.get_checkpoint_info agent0.chainsync.db.hyperdrive.interface.get_all_traders agent0.chainsync.db.hyperdrive.interface.get_position_snapshot agent0.chainsync.db.hyperdrive.interface.get_total_pnl_over_time agent0.chainsync.db.hyperdrive.interface.get_positions_over_time agent0.chainsync.db.hyperdrive.interface.get_realized_value_over_time Module Contents --------------- .. py:function:: add_hyperdrive_addr_to_name(name: str, hyperdrive_address: str, session: sqlalchemy.orm.Session, force_update: bool = False) -> None Add username mapping to postgres during agent initialization. :param name: The logical name to attach to the wallet address. :type name: str :param hyperdrive_address: A hyperdrive address to map to the name. :type hyperdrive_address: str :param session: The initialized session object. :type session: Session :param force_update: If true and an existing mapping is found, will overwrite. :type force_update: bool .. !! processed by numpydoc !! .. py:function:: get_hyperdrive_addr_to_name(session: sqlalchemy.orm.Session, hyperdrive_address: str | None = None) -> pandas.DataFrame Get all usermapping and returns as a pandas dataframe. :param session: The initialized session object :type session: Session :param hyperdrive_address: The hyperdrive address to filter the results on. Return all if None :type hyperdrive_address: str | None, optional :returns: A DataFrame that consists of the queried pool config data :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: add_trade_events(transfer_events: list[agent0.chainsync.db.hyperdrive.schema.DBTradeEvent], session: sqlalchemy.orm.Session) -> None Add transfer events to the transfer events table. This function is only used for injecting rows into the db. The actual ingestion happens via `trade_events_to_db` using dataframes. :param transfer_events: A list of HyperdriveTransferEvent objects to insert into postgres. :type transfer_events: list[HyperdriveTransferEvent] :param session: The initialized session object. :type session: Session .. !! processed by numpydoc !! .. py:function:: get_latest_block_number_from_trade_event(session: sqlalchemy.orm.Session, hyperdrive_address: str | None, wallet_address: str | None) -> int Get the latest block number based on the hyperdrive events table in the db. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive address to filter the results on. Can be None to return latest block number regardless of pool. :type hyperdrive_address: str | None :param wallet_address: The wallet address to filter the results on. Can be None to return latest block number regardless of wallet. :type wallet_address: str | None :returns: The latest block number in the hyperdrive_events table. :rtype: int .. !! processed by numpydoc !! .. py:function:: get_latest_block_number_from_positions_snapshot_table(session: sqlalchemy.orm.Session, wallet_addr: str | None, hyperdrive_address: str | None) -> int Get the latest block number based on the positions snapshot table in the db. :param session: The initialized session object. :type session: Session :param wallet_addr: The wallet address to filter the results on. Can be None to return latest block number regardless of wallet. :type wallet_addr: str | None :param hyperdrive_address: The hyperdrive address to filter the results on. Can be None to return latest block number regardless of pool. :type hyperdrive_address: str | None :returns: The latest block number in the hyperdrive_events table. :rtype: int .. !! processed by numpydoc !! .. py:function:: get_trade_events(session: sqlalchemy.orm.Session, wallet_address: str | list[str] | None = None, hyperdrive_address: str | list[str] | None = None, all_token_deltas: bool = True, sort_ascending: bool = True, query_limit: int | None = None, coerce_float=False) -> pandas.DataFrame Get all trade events and returns a pandas dataframe. :param session: The initialized db session object. :type session: Session :param wallet_address: The wallet address(es) to filter the results on. Return all if None. :type wallet_address: str | list[str] | None, optional :param hyperdrive_address: The hyperdrive address(es) to filter the results on. Returns all if None. :type hyperdrive_address: str | list[str] | None, optional :param all_token_deltas: When removing liquidity that results in withdrawal shares, the events table returns two entries for this transaction to keep track of token deltas (one for lp tokens and one for withdrawal shares). If this flag is True, will return all entries in the table, which is useful for calculating token positions. If False, will drop the duplicate withdrawal share entry (useful for returning a ticker). Defaults to True. :type all_token_deltas: bool, optional :param sort_ascending: If True, will sort events in ascending block order. Otherwise, will sort in descending order. Defaults to True. :type sort_ascending: bool, optional :param query_limit: The number of rows to return. Defaults to return all rows. :type query_limit: int | None, optional :param coerce_float: If True, will return floats in dataframe. Otherwise, will return fixed point Decimal. Defaults to False. :type coerce_float: bool, optional :returns: A DataFrame that consists of the queried trade events data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_current_positions(session: sqlalchemy.orm.Session, wallet_addr: str | None = None, hyperdrive_address: str | None = None, query_block: int | None = None, show_closed_positions: bool = False, coerce_float=False) -> pandas.DataFrame Gets all positions for a given wallet address. :param session: The initialized db session object. :type session: Session :param wallet_addr: The wallet address to filter the results on. :type wallet_addr: str :param hyperdrive_address: The hyperdrive address to filter the results on. Returns all if None. :type hyperdrive_address: str | None, optional :param query_block: The block to get positions for. query_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type query_block: int | None, optional :param show_closed_positions: Whether to show positions closed positions (i.e., positions with zero balance). Defaults to False. When False, will only return currently open positions. Useful for gathering currently open positions. When True, will also return any closed positions. Useful for calculating overall pnl of all positions. :type show_closed_positions: bool, optional :param coerce_float: If True, will coerce all numeric columns to float. :type coerce_float: bool :returns: A DataFrame that consists of the queried pool info data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_pool_config(session: sqlalchemy.orm.Session, hyperdrive_address: str | None = None, coerce_float=False) -> pandas.DataFrame Get all pool config and returns a pandas dataframe. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The contract_address to filter the results on. Return all if None. Defaults to returning all. :type hyperdrive_address: str | None, optional :param coerce_float: If True, will coerce all numeric columns to float. Defaults to False. :type coerce_float: bool, optional :returns: A DataFrame that consists of the queried pool config data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: add_pool_config(pool_config: agent0.chainsync.db.hyperdrive.schema.DBPoolConfig, session: sqlalchemy.orm.Session) -> None Add pool config to the pool config table if not exist. Verify pool config if it does exist. :param pool_config: A PoolConfig object to insert into postgres. :type pool_config: PoolConfig :param session: The initialized session object. :type session: Session .. !! processed by numpydoc !! .. py:function:: add_pool_infos(pool_infos: list[agent0.chainsync.db.hyperdrive.schema.DBPoolInfo], session: sqlalchemy.orm.Session) -> None Add a pool info to the poolinfo table. :param pool_infos: A list of PoolInfo objects to insert into postgres. :type pool_infos: list[PoolInfo] :param session: The initialized session object. :type session: Session .. !! processed by numpydoc !! .. py:function:: add_checkpoint_info(checkpoint_info: agent0.chainsync.db.hyperdrive.schema.DBCheckpointInfo, session: sqlalchemy.orm.Session) -> None Add checkpoint info to the checkpointinfo table if it doesn't exist. This function is only used for injecting rows into the db. The actual ingestion happens via `checkpoint_events_to_db` using dataframes. :param checkpoint_info: A CheckpointInfo object to insert into postgres. :type checkpoint_info: CheckpointInfo :param session: The initialized session object. :type session: Session .. !! processed by numpydoc !! .. py:function:: get_latest_block_number_from_pool_info_table(session: sqlalchemy.orm.Session, hyperdrive_address: str | None = None) -> int Get the latest block number based on the pool info table in the db. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive address to filter the results on. Can be None to return latest block number regardless of pool. :type hyperdrive_address: str | None :returns: The latest block number in the poolinfo table. :rtype: int .. !! processed by numpydoc !! .. py:function:: get_pool_info(session: sqlalchemy.orm.Session, hyperdrive_address: str | None = None, start_block: int | None = None, end_block: int | None = None, coerce_float=False) -> pandas.DataFrame Get all pool info and returns a pandas dataframe. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive address to filter the query on. Return all if None. :type hyperdrive_address: str | None, optional :param start_block: The starting block to filter the query on. start_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type start_block: int | None, optional :param end_block: The ending block to filter the query on. end_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type end_block: int | None, optional :param coerce_float: If true, will return floats in dataframe. Otherwise, will return fixed point Decimal. :type coerce_float: bool, optional :returns: A DataFrame that consists of the queried pool info data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_latest_block_number_from_checkpoint_info_table(session: sqlalchemy.orm.Session, hyperdrive_address: str | None) -> int Get the latest block number based on the checkpoint info table in the db. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive pool address to filter the query on. :type hyperdrive_address: str | None :returns: The latest block number in the poolinfo table. :rtype: int .. !! processed by numpydoc !! .. py:function:: get_checkpoint_info(session: sqlalchemy.orm.Session, hyperdrive_address: str | None = None, checkpoint_time: int | None = None, coerce_float=False) -> pandas.DataFrame Get all info associated with a given checkpoint. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive pool address to filter the query on. Defaults to returning all checkpoint infos. :type hyperdrive_address: str | None, optional :param checkpoint_time: The checkpoint time to filter the query on. Defaults to returning all checkpoint infos. :type checkpoint_time: int | None, optional :param coerce_float: If True, will return floats in dataframe. Otherwise, will return fixed point Decimal. Defaults to False :type coerce_float: bool, optional :returns: A DataFrame that consists of the queried checkpoint info. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_all_traders(session: sqlalchemy.orm.Session, hyperdrive_address: str | None = None) -> pandas.Series Get the list of all traders from the TradeEvent table. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive pool address to filter the query on. Defaults to returning all traders. :type hyperdrive_address: str | None, optional :returns: A list of addresses that have made a trade. :rtype: pd.Series .. !! processed by numpydoc !! .. py:function:: get_position_snapshot(session: sqlalchemy.orm.Session, hyperdrive_address: str | list[str] | None = None, latest_entry: bool = False, start_block: int | None = None, end_block: int | None = None, wallet_address: list[str] | str | None = None, coerce_float=False) -> pandas.DataFrame Get all position snapshot data and returns a pandas dataframe. :param session: The initialized session object. :type session: Session :param hyperdrive_address: The hyperdrive pool address(es) to filter the query on. Defaults to returning all position snapshots. :type hyperdrive_address: str | list[str] | None, optional :param latest_entry: If true, will return the latest entry for every pool. Defaults to False. :type latest_entry: bool, optional :param start_block: The starting block to filter the query on. start_block integers matches python slicing notation, e.g., list[:3], list[:-3]. Defaults to first entry. Not used if `latest_entry` is True. :type start_block: int | None, optional :param end_block: The ending block to filter the query on. end_block integers matches python slicing notation, e.g., list[:3], list[:-3]. Defaults to last entry. Not used if `latest_entry` is True. :type end_block: int | None, optional :param wallet_address: The wallet addresses to filter the query on. Returns all if None. :type wallet_address: list[str] | None, optional :param coerce_float: If True, will return floats in dataframe. Otherwise, will return fixed point Decimal. Defaults to False. :type coerce_float: bool, optional :returns: A DataFrame that consists of the queried pool info data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_total_pnl_over_time(session: sqlalchemy.orm.Session, start_block: int | None = None, end_block: int | None = None, wallet_address: list[str] | None = None, coerce_float=False) -> pandas.DataFrame Aggregate pnl over time over all positions a wallet has. :param session: The initialized session object. :type session: Session :param start_block: The starting block to filter the query on. start_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type start_block: int | None, optional :param end_block: The ending block to filter the query on. end_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type end_block: int | None, optional :param wallet_address: The wallet addresses to filter the query on. Returns all if None. :type wallet_address: list[str] | None, optional :param coerce_float: If true, will return floats in dataframe. Otherwise, will return fixed point Decimal. :type coerce_float: bool :returns: A DataFrame that consists of the queried pool info data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_positions_over_time(session: sqlalchemy.orm.Session, start_block: int | None = None, end_block: int | None = None, wallet_address: list[str] | None = None, coerce_float=False) -> pandas.DataFrame Aggregate over token types over all position types. :param session: The initialized session object. :type session: Session :param start_block: The starting block to filter the query on. start_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type start_block: int | None, optional :param end_block: The ending block to filter the query on. end_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type end_block: int | None, optional :param wallet_address: The wallet addresses to filter the query on. Returns all if None. :type wallet_address: list[str] | None, optional :param coerce_float: If true, will return floats in dataframe. Otherwise, will return fixed point Decimal. :type coerce_float: bool :returns: A DataFrame that consists of the queried pool info data. :rtype: DataFrame .. !! processed by numpydoc !! .. py:function:: get_realized_value_over_time(session: sqlalchemy.orm.Session, start_block: int | None = None, end_block: int | None = None, wallet_address: list[str] | None = None, coerce_float=False) -> pandas.DataFrame Aggregate over realized value over all position types. :param session: The initialized session object. :type session: Session :param start_block: The starting block to filter the query on. start_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type start_block: int | None, optional :param end_block: The ending block to filter the query on. end_block integers matches python slicing notation, e.g., list[:3], list[:-3]. :type end_block: int | None, optional :param wallet_address: The wallet addresses to filter the query on. Returns all if None. :type wallet_address: list[str] | None, optional :param coerce_float: If true, will return floats in dataframe. Otherwise, will return fixed point Decimal. :type coerce_float: bool :returns: A DataFrame that consists of the queried pool info data. :rtype: DataFrame .. !! processed by numpydoc !!