Skip to content

IndexerStreams

WebSocket methods exposed under Indexer.streams.

new

Source code in pkg/src/dydx/indexer/streams/core.py
@classmethod
def new(cls, url: str = INDEXER_WS_URL, *, validate: bool = True):
  return cls(client=StreamsClient(url=url), default_validate=validate)

subaccounts

Source code in pkg/src/dydx/indexer/streams/subaccounts.py
async def subaccounts(
  self, address: str, *, subaccount: int,
  validate: bool | None = None, batched: bool = True,
) -> tuple[InitialMessage, AsyncIterable[UpdateMessage]]:
  res, stream = await self.client.subscribe('v4_subaccounts', id=f'{address}/{subaccount}', batched=batched)

  async def gen():
    async for msg in stream:
      data = msg['contents']
      msgs: list[UpdateMessage] = data if batched else [data]
      for d in msgs:
        try:
          validate_update(d)
        except Exception as e:
          import traceback
          import sys
          print('Validation error:', traceback.format_exc(), file=sys.stderr)
        yield validate_update(d) if self.validate(validate) else cast(UpdateMessage, d)

  c = res['contents']
  data = validate_initial_message(c) if self.validate(validate) else cast(InitialMessage, c)
  return data, gen()