IndexerStreams
WebSocket methods exposed under Indexer.streams.
new
Source code in pkg/src/dydx/indexer/streams/core.py
| @classmethod
def new(cls, url: str = INDEXER_WS_URL, *, validate: bool = True):
return cls(client=StreamsClient(url=url), default_validate=validate)
|
subaccounts
Source code in pkg/src/dydx/indexer/streams/subaccounts.py
| async def subaccounts(
self, address: str, *, subaccount: int,
validate: bool | None = None, batched: bool = True,
) -> tuple[InitialMessage, AsyncIterable[UpdateMessage]]:
res, stream = await self.client.subscribe('v4_subaccounts', id=f'{address}/{subaccount}', batched=batched)
async def gen():
async for msg in stream:
data = msg['contents']
msgs: list[UpdateMessage] = data if batched else [data]
for d in msgs:
try:
validate_update(d)
except Exception as e:
import traceback
import sys
print('Validation error:', traceback.format_exc(), file=sys.stderr)
yield validate_update(d) if self.validate(validate) else cast(UpdateMessage, d)
c = res['contents']
data = validate_initial_message(c) if self.validate(validate) else cast(InitialMessage, c)
return data, gen()
|