Skip to content

Commit f9f3195

Browse files
committed
cleanup: simplify the concurrency configuration
The previous code was confusing, the RPC-CONCURRENCY was configuring the number of parallel blocks we could ingest, instead of the real jsonRPC concurrency to the RPC node. This (in a backwards compatible way) makes the variable configure the jsonRPC max concurrency. The default value changed from: 25 to 80 to account for this. Number of concurrent workers to fetch blocks is now configured internally to: RPC_CONCURRENCY / 4.
1 parent e443cc7 commit f9f3195

File tree

6 files changed

+67
-58
lines changed

6 files changed

+67
-58
lines changed

README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,16 @@ Also, we mention some of the options here:
4747
The `log` flag (environment variable `LOG`) controls the log level. Use `--log debug`/`LOG=debug` to emit more logs than the default `info` level. To emit less logs, use `warn`, or `error` (least).
4848

4949
### Tuning RPC concurrency
50-
The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines)
51-
to run concurrently to perform RPC node requests. Default is 25.
50+
The flag `--rpc-concurrency` (environment variable `RPC_CONCURRENCY`) specifies the number of threads (goroutines) to run concurrently to perform RPC node requests. See `--help` for up to date default value.
51+
52+
### Tuning for throughput
53+
Throughput depends on: latency & request rate between RPC <-> Node Indexer <--> DuneAPI and can be tuned via a combination of:
54+
1. RPC_CONCURRENCY, higher values feed more blocks into the node indexer to process
55+
1. MAX_BATCH_SIZE, higher values send more blocks per request to DuneAPI
56+
1. BLOCK_SUBMIT_INTERVAL, the interval at which blocks to DuneAPI
57+
See `--help` for up to date default values.
58+
59+
5260

5361
### RPC poll interval
5462
The flag `--rpc-poll-interval` (environment variable `RPC_POLL_INTERVAL`) specifies the duration to wait before checking

cmd/main.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func main() {
9595
EVMStack: cfg.RPCStack,
9696
// real max request concurrency to RPP node
9797
// each block requires multiple RPC requests
98-
TotalRPCConcurrency: cfg.BlockConcurrency * 4,
98+
TotalRPCConcurrency: cfg.RPCConcurrency,
9999
})
100100
if err != nil {
101101
stdlog.Fatal(err)
@@ -145,17 +145,19 @@ func main() {
145145
duneClient,
146146
duneClientDLQ,
147147
ingester.Config{
148-
MaxConcurrentRequests: cfg.BlockConcurrency,
149-
MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency,
150-
MaxBatchSize: cfg.MaxBatchSize,
151-
ReportProgressInterval: cfg.ReportProgressInterval,
152-
PollInterval: cfg.PollInterval,
153-
PollDLQInterval: cfg.PollDLQInterval,
154-
Stack: cfg.RPCStack,
155-
BlockchainName: cfg.BlockchainName,
156-
BlockSubmitInterval: cfg.BlockSubmitInterval,
157-
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
158-
DLQOnly: cfg.DLQOnly,
148+
// OpStack does 3 requests per block, ArbitrumNova is variable
149+
// leave some room for other requests
150+
MaxConcurrentBlocks: cfg.RPCConcurrency / 4,
151+
DLQMaxConcurrentBlocks: cfg.DLQBlockConcurrency,
152+
MaxBatchSize: cfg.MaxBatchSize,
153+
ReportProgressInterval: cfg.ReportProgressInterval,
154+
PollInterval: cfg.PollInterval,
155+
PollDLQInterval: cfg.PollDLQInterval,
156+
Stack: cfg.RPCStack,
157+
BlockchainName: cfg.BlockchainName,
158+
BlockSubmitInterval: cfg.BlockSubmitInterval,
159+
SkipFailedBlocks: cfg.RPCNode.SkipFailedBlocks,
160+
DLQOnly: cfg.DLQOnly,
159161
},
160162
progress,
161163
dlqBlockNumbers,

config/config.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,12 @@ type Config struct {
5454
DLQRetryInterval time.Duration `long:"dlq-retry-interval" env:"DLQ_RETRY_INTERVAL" description:"Interval for linear backoff in DLQ " default:"1m"` // nolint:lll
5555
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
5656
RPCNode RPCClient
57-
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
58-
// kept the old cmdline arg names and env variables for backwards compatibility
59-
BlockConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of concurrent block requests to the RPC node" default:"25"` // nolint:lll
60-
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
61-
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
62-
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
63-
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll
57+
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
58+
RPCConcurrency int `long:"rpc-concurrency" env:"RPC_CONCURRENCY" description:"Number of maximum concurrent jsonRPC requests to the RPC node" default:"80"` // nolint:lll
59+
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
60+
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
61+
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
62+
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send per batch (max:256)" default:"128"` // nolint:lll
6463
}
6564

6665
func (c Config) HasError() error {

ingester/ingester.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,17 @@ const (
5757
)
5858

5959
type Config struct {
60-
MaxConcurrentRequests int
61-
MaxConcurrentRequestsDLQ int
62-
PollInterval time.Duration
63-
PollDLQInterval time.Duration
64-
ReportProgressInterval time.Duration
65-
Stack models.EVMStack
66-
BlockchainName string
67-
BlockSubmitInterval time.Duration
68-
SkipFailedBlocks bool
69-
DLQOnly bool
70-
MaxBatchSize int
60+
MaxConcurrentBlocks int
61+
DLQMaxConcurrentBlocks int
62+
PollInterval time.Duration
63+
PollDLQInterval time.Duration
64+
ReportProgressInterval time.Duration
65+
Stack models.EVMStack
66+
BlockchainName string
67+
BlockSubmitInterval time.Duration
68+
SkipFailedBlocks bool
69+
DLQOnly bool
70+
MaxBatchSize int
7171
}
7272

7373
type ingester struct {

ingester/mainloop.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
2828
registerIngesterMetrics(i)
2929

3030
if i.cfg.DLQOnly {
31-
i.cfg.MaxConcurrentRequests = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
31+
i.cfg.MaxConcurrentBlocks = 0 // if running DLQ Only mode, ignore the MaxConcurrentRequests and set this to 0
3232
} else {
33-
if i.cfg.MaxConcurrentRequests <= 0 {
33+
if i.cfg.MaxConcurrentBlocks <= 0 {
3434
return errors.Errorf("MaxConcurrentRequests must be > 0")
3535
}
3636
}
37-
if i.cfg.MaxConcurrentRequestsDLQ <= 0 {
37+
if i.cfg.DLQMaxConcurrentBlocks <= 0 {
3838
return errors.Errorf("MaxConcurrentRequestsDLQ must be > 0")
3939
}
4040

@@ -52,8 +52,8 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
5252
blocks := make(chan models.RPCBlock, 2*maxBatchSize)
5353
defer close(blocks)
5454

55-
// Start MaxBatchSize goroutines to consume blocks concurrently
56-
for range i.cfg.MaxConcurrentRequests {
55+
// Start MaxConcurrentBlocks goroutines to consume blocks concurrently
56+
for range i.cfg.MaxConcurrentBlocks {
5757
errGroup.Go(func() error {
5858
return i.FetchBlockLoop(ctx, blockNumbers, blocks)
5959
})
@@ -70,13 +70,13 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
7070
blockNumbersDLQ := make(chan dlq.Item[int64])
7171
defer close(blockNumbersDLQ)
7272

73-
blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.MaxConcurrentRequestsDLQ+1)
73+
blocksDLQ := make(chan dlq.Item[models.RPCBlock], i.cfg.DLQMaxConcurrentBlocks+1)
7474
defer close(blocksDLQ)
7575

7676
errGroup.Go(func() error {
7777
return i.SendBlocksDLQ(ctx, blocksDLQ)
7878
})
79-
for range i.cfg.MaxConcurrentRequestsDLQ {
79+
for range i.cfg.DLQMaxConcurrentBlocks {
8080
errGroup.Go(func() error {
8181
return i.FetchBlockLoopDLQ(ctx, blockNumbersDLQ, blocksDLQ)
8282
})
@@ -91,7 +91,7 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int
9191
"runForever", maxCount <= 0,
9292
"startBlockNumber", startBlockNumber,
9393
"endBlockNumber", endBlockNumber,
94-
"maxConcurrency", i.cfg.MaxConcurrentRequests,
94+
"maxConcurrency", i.cfg.MaxConcurrentBlocks,
9595
)
9696

9797
// Produce block numbers in the main goroutine

ingester/mainloop_test.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ func TestRunUntilCancel(t *testing.T) {
8080
duneapi,
8181
duneapi,
8282
ingester.Config{
83-
BlockSubmitInterval: time.Nanosecond,
84-
MaxConcurrentRequests: 10,
85-
MaxConcurrentRequestsDLQ: 2,
86-
SkipFailedBlocks: false,
83+
BlockSubmitInterval: time.Nanosecond,
84+
MaxConcurrentBlocks: 10,
85+
DLQMaxConcurrentBlocks: 2,
86+
SkipFailedBlocks: false,
8787
},
8888
nil, // progress
8989
dlq.NewDLQ[int64](),
@@ -262,8 +262,8 @@ func TestRunBlocksOutOfOrder(t *testing.T) {
262262
duneapi,
263263
duneapi,
264264
ingester.Config{
265-
MaxConcurrentRequests: 20,
266-
MaxConcurrentRequestsDLQ: 2, // fetch blocks in multiple goroutines
265+
MaxConcurrentBlocks: 20,
266+
DLQMaxConcurrentBlocks: 2, // fetch blocks in multiple goroutines
267267
// big enough compared to the time spent in block by number to ensure batching. We panic
268268
// in the mocked Dune client if we don't get a batch of blocks (more than one block).
269269
BlockSubmitInterval: 50 * time.Millisecond,
@@ -315,10 +315,10 @@ func TestRunRPCNodeFails(t *testing.T) {
315315
duneapi,
316316
duneapi,
317317
ingester.Config{
318-
MaxConcurrentRequests: 10,
319-
MaxConcurrentRequestsDLQ: 2,
320-
BlockSubmitInterval: time.Millisecond,
321-
SkipFailedBlocks: false,
318+
MaxConcurrentBlocks: 10,
319+
DLQMaxConcurrentBlocks: 2,
320+
BlockSubmitInterval: time.Millisecond,
321+
SkipFailedBlocks: false,
322322
},
323323
nil, // progress
324324
dlq.NewDLQ[int64](),
@@ -338,7 +338,7 @@ func TestRunFailsIfNoConcurrentRequests(t *testing.T) {
338338
nil,
339339
nil,
340340
ingester.Config{
341-
MaxConcurrentRequests: 0,
341+
MaxConcurrentBlocks: 0,
342342
},
343343
nil, // progress
344344
dlq.NewDLQ[int64](),
@@ -357,8 +357,8 @@ func TestRunFailsIfNoConcurrentRequestsDLQ(t *testing.T) {
357357
nil,
358358
nil,
359359
ingester.Config{
360-
MaxConcurrentRequests: 10,
361-
MaxConcurrentRequestsDLQ: 0,
360+
MaxConcurrentBlocks: 10,
361+
DLQMaxConcurrentBlocks: 0,
362362
},
363363
nil, // progress
364364
dlq.NewDLQ[int64](),
@@ -478,11 +478,11 @@ func TestRunWithDLQ(t *testing.T) {
478478
duneapi,
479479
duneapi,
480480
ingester.Config{
481-
BlockSubmitInterval: time.Nanosecond,
482-
MaxConcurrentRequests: 10,
483-
MaxConcurrentRequestsDLQ: 1,
484-
DLQOnly: false,
485-
SkipFailedBlocks: true,
481+
BlockSubmitInterval: time.Nanosecond,
482+
MaxConcurrentBlocks: 10,
483+
DLQMaxConcurrentBlocks: 1,
484+
DLQOnly: false,
485+
SkipFailedBlocks: true,
486486
},
487487
nil, // progress
488488
dlqBlockNumbers,

0 commit comments

Comments
 (0)