diff --git a/Makefile b/Makefile index ac4184a..feb5e54 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,12 @@ ingest-starknet: --first-block 600000 \ --chunk-size 256 +ingest-starknet-to-json: + @python3 -m sqa.starknet.writer json/starknet \ + -e ${STARKNET_NODE} \ + --first-block 600000 \ + --chunk-size 256 \ + --raw router: @python3 tests/fake_router.py diff --git a/sqa/starknet/writer/main.py b/sqa/starknet/writer/main.py index 700be47..de0aa13 100644 --- a/sqa/starknet/writer/main.py +++ b/sqa/starknet/writer/main.py @@ -4,7 +4,7 @@ import threading from functools import cache from queue import Queue -from typing import AsyncIterator, Iterable +from typing import AsyncIterator, Iterable, Generator from sqa.eth.ingest.main import EndpointAction from sqa.starknet.writer.ingest import IngestStarknet @@ -20,7 +20,7 @@ async def rpc_ingest(rpc: RpcClient, first_block: int, last_block: int | None = None) -> AsyncIterator[list[WriterBlock]]: - LOG.info(f'ingesting data via RPC') + LOG.info('Ingesting data via RPC') # TODO: ensure starknet finality for finality_confirmation arg ingest = IngestStarknet( @@ -36,12 +36,12 @@ async def rpc_ingest(rpc: RpcClient, first_block: int, last_block: int | None = ingest.close() -def _to_sync_gen(gen): +def _to_sync_gen(gen: AsyncIterator) -> Generator: q = Queue(maxsize=5) async def consume_gen(): try: - async for it in gen(): + async for it in gen: q.put(it) q.put(None) except Exception as ex: @@ -83,6 +83,13 @@ def _arguments(self): help='rpc api url of an ethereum node to fetch data from' ) + program.add_argument( + '-s', '--src', + type=str, + metavar='URL', + help='URL of the data ingestion service' + ) + program.add_argument( '--batch-limit', dest='batch_limit', @@ -127,10 +134,26 @@ def _arguments(self): help='port to use for built-in prometheus metrics server' ) + program.add_argument( + '--raw', + action='store_true', + help='use raw .jsonl.gz format' + ) + + program.add_argument( + '--raw-src', + action='store_true', + help='archive with raw, pre-fetched .jsonl.gz data' + ) + return program.parse_args() def _ingest(self) -> Iterable[list[WriterBlock]]: # type: ignore # NOTE: incopatible block type with dict type args = self._arguments() + + if args.raw_src: # use raw ingester from sqa.writer + yield from super()._ingest() # type: ignore # NOTE: block is block + endpoints = [RpcEndpoint(**e) for e in args.endpoints] if endpoints: rpc = RpcClient( @@ -142,11 +165,20 @@ def _ingest(self) -> Iterable[list[WriterBlock]]: # type: ignore # NOTE: incop assert rpc, 'no endpoints were specified' - return _to_sync_gen(lambda: rpc_ingest(rpc, args.first_block, args.last_block)) + yield from _to_sync_gen(rpc_ingest(rpc, self._sink().get_next_block(), args.last_block)) def create_writer(self) -> Writer: return ParquetWriter() + def main(self): + args = self._arguments() + if args.raw: + self.init_support_services() + sink = self._sink() + sink._raw_writer(self._ingest()) + else: + super().main() + def main(module_name: str) -> None: _CLI(module_name).main() diff --git a/sqa/writer/__init__.py b/sqa/writer/__init__.py index 328889d..ec637b6 100644 --- a/sqa/writer/__init__.py +++ b/sqa/writer/__init__.py @@ -1,10 +1,15 @@ +import json import logging import math +import os +import tempfile import time from functools import cached_property from typing import Iterable, Protocol, Any -from sqa.fs import create_fs, Fs +import pyarrow + +from sqa.fs import LocalFs, create_fs, Fs from sqa.layout import ChunkWriter from sqa.util.counters import Progress @@ -166,14 +171,55 @@ def flush(): self._report() last_report = current_time - if self._writer.buffered_bytes() > 0 and last_block == write_range[1]: - flush() + # NOTE: In case no chunks were made we still need to save data we received + flush() self._writer.end() if self._progress.has_news(): self._report() + def _raw_writer(self, strides: Iterable[list[Block]]) -> None: + # heavily copied from sqa.eth.ingest.main._raw_writer + while True: + written = 0 + first_block = math.inf + tmp = tempfile.NamedTemporaryFile(delete=False) + try: + with tmp, pyarrow.CompressedOutputStream(tmp, 'gzip') as out: + for bb in strides: + first_block = min(first_block, bb[0]['number']) + last_block = bb[-1]['number'] + last_hash = bb[-1]['hash'] + + for block in bb: + line = json.dumps(block).encode('utf-8') + out.write(line) + out.write(b'\n') + written += len(line) + 1 + + if written > self._chunk_size * 1024 * 1024: + break + + if written > 0: + chunk = self._chunk_writer.next_chunk(first_block, last_block, _short_hash(last_hash)) + dest = f'{chunk.path()}/blocks.jsonl.gz' + if isinstance(self._fs, LocalFs): + loc = self._fs.abs(dest) + os.makedirs(os.path.dirname(loc), exist_ok=True) + os.rename(tmp.name, loc) + else: + # from _upload_temp_file function + try: + self._fs.upload(tmp.name, dest) + finally: + os.remove(tmp.name) + else: + return + except: + os.remove(tmp.name) + raise + def _short_hash(value: str) -> str: if value.startswith('0x'): diff --git a/sqa/writer/cli.py b/sqa/writer/cli.py index 85e804f..6ba45bf 100644 --- a/sqa/writer/cli.py +++ b/sqa/writer/cli.py @@ -142,7 +142,7 @@ def _ingest(self) -> Iterable[list[Block]]: if pack: yield pack - def main(self) -> None: + def init_support_services(self): self._start_prometheus_metrics() if os.getenv('SENTRY_DSN'): @@ -151,5 +151,7 @@ def main(self) -> None: traces_sample_rate=1.0 ) + def main(self) -> None: + self.init_support_services() sink = self._sink() sink.write(self._ingest())