Skip to content

IndexerStreams

WebSocket methods exposed under Indexer.streams.

new

Source code in pkg/src/dydx/indexer/streams/core.py
@classmethod
def new(cls, url: str = INDEXER_WS_URL, *, validate: bool = True):
  return cls(client=StreamsClient(url=url), default_validate=validate)

block_height

Subscribe to the indexer block height feed.

  • batched: Reduce incoming messages by batching contents.
  • validate: Whether to validate reply and update payloads against the generated schemas.

Official API docs

Source code in pkg/src/dydx/indexer/streams/api/block_height.py
async def block_height(
  self, *, batched: bool = True, validate: bool | None = None
) -> Stream[Notification, Reply, Unsubscribed]:
  """
  Subscribe to the indexer block height feed.

  - `batched`: Reduce incoming messages by batching contents.
  - `validate`: Whether to validate reply and update payloads against the generated schemas.

  > [Official API docs](https://docs.dydx.xyz/indexer-client/websockets#block-height)
  """
  stream = await self.client.subscribe('v4_block_height', {'batched': batched})

  async def parsed_stream() -> AsyncIterable[Notification]:
    async for msg in stream:
      data = msg['contents']
      msgs: list[Notification] = data if batched else [data]
      for d in msgs:
        yield notification_adapter.validate_python(d) if self.validate(validate) else d

  c = stream.reply['contents']
  reply: Reply = reply_adapter.validate_python(c) if self.validate(validate) else c
  return Stream(reply, parsed_stream(), stream.unsubscribe)

markets

Subscribe to the indexer markets feed.

  • batched: Reduce incoming messages by batching contents.
  • validate: Whether to validate reply and update payloads against the generated schemas.

Official API docs

Source code in pkg/src/dydx/indexer/streams/api/markets.py
async def markets(
  self, *, batched: bool = True, validate: bool | None = None,
) -> Stream[Notification, Reply, Unsubscribed]:
  """
  Subscribe to the indexer markets feed.

  - `batched`: Reduce incoming messages by batching contents.
  - `validate`: Whether to validate reply and update payloads against the generated schemas.

  > [Official API docs](https://docs.dydx.xyz/indexer-client/websockets#markets)
  """
  stream = await self.client.subscribe('v4_markets', {'batched': batched})

  async def parsed_stream() -> AsyncIterable[Notification]:
    async for msg in stream:
      data = msg['contents']
      msgs: list[Notification] = data if batched else [data]
      for d in msgs:
        yield notification_adapter.validate_python(d) if self.validate(validate) else d

  c = stream.reply['contents']
  reply: Reply = reply_adapter.validate_python(c) if self.validate(validate) else c
  return Stream(reply, parsed_stream(), stream.unsubscribe)

candles

Subscribe to candle updates using market and resolution.

Source code in pkg/src/dydx/indexer/streams/candles.py
async def candles(
  self,
  market: str,
  *,
  resolution: Resolution,
  validate: bool | None = None,
  batched: bool = True,
):
  """Subscribe to candle updates using market and resolution."""
  return await self.raw_candles(
    id=f'{market}/{resolution}',
    batched=batched,
    validate=validate,
  )

orders

Subscribe to the indexer order book feed for a market.

  • id: Market ticker.
  • batched: Reduce incoming messages by batching contents.
  • validate: Whether to validate reply and update payloads against the generated schemas.

Official API docs

Source code in pkg/src/dydx/indexer/streams/api/orders.py
async def orders(
  self, *, id: str, batched: bool = True, validate: bool | None = None,
) -> Stream[Notification, Reply, Unsubscribed]:
  """
  Subscribe to the indexer order book feed for a market.

  - `id`: Market ticker.
  - `batched`: Reduce incoming messages by batching contents.
  - `validate`: Whether to validate reply and update payloads against the generated schemas.

  > [Official API docs](https://docs.dydx.xyz/indexer-client/websockets#orders)
  """
  stream = await self.client.subscribe(f'v4_orderbook:{id}', {'batched': batched})

  async def parsed_stream() -> AsyncIterable[Notification]:
    async for msg in stream:
      data = msg['contents']
      msgs: list[Notification] = data if batched else [data]
      for d in msgs:
        yield notification_adapter.validate_python(d) if self.validate(validate) else d

  c = stream.reply['contents']
  reply: Reply = reply_adapter.validate_python(c) if self.validate(validate) else c
  return Stream(reply, parsed_stream(), stream.unsubscribe)

parent_subaccounts

Subscribe to the indexer parent subaccounts feed for an address and parent subaccount number.

  • address: Wallet address that owns the parent subaccount.
  • subaccount: Parent subaccount number.
  • batched: Reduce incoming messages by batching contents.
  • validate: Whether to validate reply and update payloads against the generated schemas.

Official API docs

Source code in pkg/src/dydx/indexer/streams/parent_subaccounts.py
async def parent_subaccounts(
  self, address: str, *,
  subaccount: int,
  validate: bool | None = None,
  batched: bool = True,
):
  """
  Subscribe to the indexer parent subaccounts feed for an address and parent subaccount number.

  - `address`: Wallet address that owns the parent subaccount.
  - `subaccount`: Parent subaccount number.
  - `batched`: Reduce incoming messages by batching contents.
  - `validate`: Whether to validate reply and update payloads against the generated schemas.

  > [Official API docs](https://docs.dydx.xyz/indexer-client/websockets#parent-subaccounts)
  """
  return await self.raw_parent_subaccounts(
    id=f'{address}/{subaccount}',
    batched=batched,
    validate=validate,
  )

subaccounts

Subscribe to subaccount updates using address and subaccount number.

Source code in pkg/src/dydx/indexer/streams/subaccounts.py
async def subaccounts(
  self, address: str, *,
  subaccount: int,
  validate: bool | None = None,
  batched: bool = True,
):
  """Subscribe to subaccount updates using address and subaccount number."""
  return await self.raw_subaccounts(
    id=f'{address}/{subaccount}',
    batched=batched,
    validate=validate,
  )

trades

Subscribe to the indexer trades feed for a market.

  • id: Market ticker.
  • batched: Reduce incoming messages by batching contents.
  • validate: Whether to validate reply and update payloads against the generated schemas.

Official API docs

Source code in pkg/src/dydx/indexer/streams/api/trades.py
async def trades(
  self, *, id: str, batched: bool = True, validate: bool | None = None,
) -> Stream[Notification, Reply, Unsubscribed]:
  """
  Subscribe to the indexer trades feed for a market.

  - `id`: Market ticker.
  - `batched`: Reduce incoming messages by batching contents.
  - `validate`: Whether to validate reply and update payloads against the generated schemas.

  > [Official API docs](https://docs.dydx.xyz/indexer-client/websockets#trades)
  """
  stream = await self.client.subscribe(f'v4_trades:{id}', {'batched': batched})

  async def parsed_stream() -> AsyncIterable[Notification]:
    async for msg in stream:
      data = msg['contents']
      msgs: list[Notification] = data if batched else [data]
      for d in msgs:
        yield notification_adapter.validate_python(d) if self.validate(validate) else d

  c = stream.reply['contents']
  reply: Reply = reply_adapter.validate_python(c) if self.validate(validate) else c
  return Stream(reply, parsed_stream(), stream.unsubscribe)