Skip to content

Commit 6eef58d

Browse files
Merge branch 'master' into historical-state-pathdb
2 parents 04b302a + ac20f34 commit 6eef58d

File tree

7 files changed

+91
-132
lines changed

7 files changed

+91
-132
lines changed

arbnode/transaction_streamer.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/ethereum/go-ethereum/core/rawdb"
2525
"github.com/ethereum/go-ethereum/ethdb"
2626
"github.com/ethereum/go-ethereum/log"
27+
"github.com/ethereum/go-ethereum/metrics"
2728
"github.com/ethereum/go-ethereum/params"
2829
"github.com/ethereum/go-ethereum/rlp"
2930

@@ -39,6 +40,10 @@ import (
3940
"github.com/offchainlabs/nitro/util/stopwaiter"
4041
)
4142

43+
var (
44+
messageTimer = metrics.NewRegisteredHistogram("arb/txstreamer/message/duration", nil, metrics.NewBoundedHistogramSample())
45+
)
46+
4247
// TransactionStreamer produces blocks from a node's L1 messages, storing the results in the blockchain and recording their positions
4348
// The streamer is notified when there's new batches to process
4449
type TransactionStreamer struct {
@@ -1349,6 +1354,7 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
13491354
return false
13501355
}
13511356
defer s.reorgMutex.RUnlock()
1357+
start := time.Now()
13521358

13531359
prevHeadMsgIdx := s.prevHeadMsgIdx
13541360
consensusHeadMsgIdx, err := s.GetHeadMessageIndex()
@@ -1417,6 +1423,8 @@ func (s *TransactionStreamer) ExecuteNextMsg(ctx context.Context) bool {
14171423
}
14181424
s.broadcastMessages([]arbostypes.MessageWithMetadataAndBlockInfo{msgWithBlockInfo}, msgIdxToExecute)
14191425

1426+
messageTimer.Update(time.Since(start).Nanoseconds())
1427+
14201428
return msgIdxToExecute+1 <= consensusHeadMsgIdx
14211429
}
14221430

daprovider/daclient/daclient.go

Lines changed: 28 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -101,16 +101,13 @@ type SupportedHeaderBytesResult struct {
101101
}
102102

103103
func (c *Client) GetSupportedHeaderBytes() containers.PromiseInterface[SupportedHeaderBytesResult] {
104-
promise, ctx := containers.NewPromiseWithContext[SupportedHeaderBytesResult](context.Background())
105-
go func() {
104+
return containers.DoPromise(context.Background(), func(ctx context.Context) (SupportedHeaderBytesResult, error) {
106105
var result server_api.SupportedHeaderBytesResult
107106
if err := c.CallContext(ctx, &result, "daprovider_getSupportedHeaderBytes"); err != nil {
108-
promise.ProduceError(fmt.Errorf("error returned from daprovider_getSupportedHeaderBytes rpc method: %w", err))
109-
} else {
110-
promise.Produce(SupportedHeaderBytesResult{HeaderBytes: result.HeaderBytes})
107+
return SupportedHeaderBytesResult{}, fmt.Errorf("error returned from daprovider_getSupportedHeaderBytes rpc method: %w", err)
111108
}
112-
}()
113-
return promise
109+
return SupportedHeaderBytesResult{HeaderBytes: result.HeaderBytes}, nil
110+
})
114111
}
115112

116113
// RecoverPayload fetches the underlying payload from the DA provider
@@ -119,16 +116,14 @@ func (c *Client) RecoverPayload(
119116
batchBlockHash common.Hash,
120117
sequencerMsg []byte,
121118
) containers.PromiseInterface[daprovider.PayloadResult] {
122-
promise, ctx := containers.NewPromiseWithContext[daprovider.PayloadResult](context.Background())
123-
go func() {
119+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PayloadResult, error) {
124120
var result daprovider.PayloadResult
125-
if err := c.CallContext(ctx, &result, "daprovider_recoverPayload", hexutil.Uint64(batchNum), batchBlockHash, hexutil.Bytes(sequencerMsg)); err != nil {
126-
promise.ProduceError(fmt.Errorf("error returned from daprovider_recoverPayload rpc method, err: %w", err))
127-
} else {
128-
promise.Produce(result)
121+
err := c.CallContext(ctx, &result, "daprovider_recoverPayload", hexutil.Uint64(batchNum), batchBlockHash, hexutil.Bytes(sequencerMsg))
122+
if err != nil {
123+
err = fmt.Errorf("error returned from daprovider_recoverPayload rpc method, err: %w", err)
129124
}
130-
}()
131-
return promise
125+
return result, err
126+
})
132127
}
133128

134129
// CollectPreimages collects preimages from the DA provider
@@ -137,32 +132,27 @@ func (c *Client) CollectPreimages(
137132
batchBlockHash common.Hash,
138133
sequencerMsg []byte,
139134
) containers.PromiseInterface[daprovider.PreimagesResult] {
140-
promise, ctx := containers.NewPromiseWithContext[daprovider.PreimagesResult](context.Background())
141-
go func() {
135+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PreimagesResult, error) {
142136
var result daprovider.PreimagesResult
143-
if err := c.CallContext(ctx, &result, "daprovider_collectPreimages", hexutil.Uint64(batchNum), batchBlockHash, hexutil.Bytes(sequencerMsg)); err != nil {
144-
promise.ProduceError(fmt.Errorf("error returned from daprovider_collectPreimages rpc method, err: %w", err))
145-
} else {
146-
promise.Produce(result)
137+
err := c.CallContext(ctx, &result, "daprovider_collectPreimages", hexutil.Uint64(batchNum), batchBlockHash, hexutil.Bytes(sequencerMsg))
138+
if err != nil {
139+
err = fmt.Errorf("error returned from daprovider_collectPreimages rpc method, err: %w", err)
147140
}
148-
}()
149-
return promise
141+
return result, err
142+
})
150143
}
151144

152145
func (c *Client) Store(
153146
message []byte,
154147
timeout uint64,
155148
) containers.PromiseInterface[[]byte] {
156-
promise, ctx := containers.NewPromiseWithContext[[]byte](context.Background())
157-
go func() {
149+
return containers.DoPromise(context.Background(), func(ctx context.Context) ([]byte, error) {
158150
storeResult, err := c.store(ctx, message, timeout)
159151
if err != nil {
160-
promise.ProduceError(err)
161-
} else {
162-
promise.Produce(storeResult.SerializedDACert)
152+
return nil, err
163153
}
164-
}()
165-
return promise
154+
return storeResult.SerializedDACert, nil
155+
})
166156
}
167157

168158
func (c *Client) store(ctx context.Context, message []byte, timeout uint64) (*server_api.StoreResult, error) {
@@ -192,29 +182,23 @@ func (c *Client) GenerateReadPreimageProof(
192182
offset uint64,
193183
certificate []byte,
194184
) containers.PromiseInterface[daprovider.PreimageProofResult] {
195-
promise, ctx := containers.NewPromiseWithContext[daprovider.PreimageProofResult](context.Background())
196-
go func() {
185+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PreimageProofResult, error) {
197186
var generateProofResult server_api.GenerateReadPreimageProofResult
198187
if err := c.CallContext(ctx, &generateProofResult, "daprovider_generateReadPreimageProof", certHash, hexutil.Uint64(offset), hexutil.Bytes(certificate)); err != nil {
199-
promise.ProduceError(fmt.Errorf("error returned from daprovider_generateProof rpc method, err: %w", err))
200-
} else {
201-
promise.Produce(daprovider.PreimageProofResult{Proof: generateProofResult.Proof})
188+
return daprovider.PreimageProofResult{}, fmt.Errorf("error returned from daprovider_generateProof rpc method, err: %w", err)
202189
}
203-
}()
204-
return promise
190+
return daprovider.PreimageProofResult{Proof: generateProofResult.Proof}, nil
191+
})
205192
}
206193

207194
func (c *Client) GenerateCertificateValidityProof(
208195
certificate []byte,
209196
) containers.PromiseInterface[daprovider.ValidityProofResult] {
210-
promise, ctx := containers.NewPromiseWithContext[daprovider.ValidityProofResult](context.Background())
211-
go func() {
197+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.ValidityProofResult, error) {
212198
var generateCertificateValidityProofResult server_api.GenerateCertificateValidityProofResult
213199
if err := c.CallContext(ctx, &generateCertificateValidityProofResult, "daprovider_generateCertificateValidityProof", hexutil.Bytes(certificate)); err != nil {
214-
promise.ProduceError(fmt.Errorf("error returned from daprovider_generateCertificateValidityProof rpc method, err: %w", err))
215-
} else {
216-
promise.Produce(daprovider.ValidityProofResult{Proof: generateCertificateValidityProofResult.Proof})
200+
return daprovider.ValidityProofResult{}, fmt.Errorf("error returned from daprovider_generateCertificateValidityProof rpc method, err: %w", err)
217201
}
218-
}()
219-
return promise
202+
return daprovider.ValidityProofResult{Proof: generateCertificateValidityProofResult.Proof}, nil
203+
})
220204
}

daprovider/das/dasutil/dasutil.go

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,10 @@ func (d *readerForDAS) RecoverPayload(
7474
batchBlockHash common.Hash,
7575
sequencerMsg []byte,
7676
) containers.PromiseInterface[daprovider.PayloadResult] {
77-
promise, ctx := containers.NewPromiseWithContext[daprovider.PayloadResult](context.Background())
78-
go func() {
77+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PayloadResult, error) {
7978
payload, _, err := d.recoverInternal(ctx, batchNum, sequencerMsg, true, false)
80-
if err != nil {
81-
promise.ProduceError(err)
82-
} else {
83-
promise.Produce(daprovider.PayloadResult{Payload: payload})
84-
}
85-
}()
86-
return promise
79+
return daprovider.PayloadResult{Payload: payload}, err
80+
})
8781
}
8882

8983
// CollectPreimages collects preimages from the DA provider
@@ -92,16 +86,10 @@ func (d *readerForDAS) CollectPreimages(
9286
batchBlockHash common.Hash,
9387
sequencerMsg []byte,
9488
) containers.PromiseInterface[daprovider.PreimagesResult] {
95-
promise, ctx := containers.NewPromiseWithContext[daprovider.PreimagesResult](context.Background())
96-
go func() {
89+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PreimagesResult, error) {
9790
_, preimages, err := d.recoverInternal(ctx, batchNum, sequencerMsg, false, true)
98-
if err != nil {
99-
promise.ProduceError(err)
100-
} else {
101-
promise.Produce(daprovider.PreimagesResult{Preimages: preimages})
102-
}
103-
}()
104-
return promise
91+
return daprovider.PreimagesResult{Preimages: preimages}, err
92+
})
10593
}
10694

10795
// NewWriterForDAS is generally meant to be only used by nitro.
@@ -115,16 +103,13 @@ type writerForDAS struct {
115103
}
116104

117105
func (d *writerForDAS) Store(message []byte, timeout uint64) containers.PromiseInterface[[]byte] {
118-
promise, ctx := containers.NewPromiseWithContext[[]byte](context.Background())
119-
go func() {
106+
return containers.DoPromise(context.Background(), func(ctx context.Context) ([]byte, error) {
120107
cert, err := d.dasWriter.Store(ctx, message, timeout)
121108
if err != nil {
122-
promise.ProduceError(err)
123-
} else {
124-
promise.Produce(Serialize(cert))
109+
return nil, err
125110
}
126-
}()
127-
return promise
111+
return Serialize(cert), nil
112+
})
128113
}
129114

130115
var (

daprovider/reader.go

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ type readerForBlobReader struct {
6969
// recoverInternal is the shared implementation for both RecoverPayload and CollectPreimages
7070
func (b *readerForBlobReader) recoverInternal(
7171
ctx context.Context,
72-
batchNum uint64,
7372
batchBlockHash common.Hash,
7473
sequencerMsg []byte,
7574
needPayload bool,
@@ -118,16 +117,10 @@ func (b *readerForBlobReader) RecoverPayload(
118117
batchBlockHash common.Hash,
119118
sequencerMsg []byte,
120119
) containers.PromiseInterface[PayloadResult] {
121-
promise, ctx := containers.NewPromiseWithContext[PayloadResult](context.Background())
122-
go func() {
123-
payload, _, err := b.recoverInternal(ctx, batchNum, batchBlockHash, sequencerMsg, true, false)
124-
if err != nil {
125-
promise.ProduceError(err)
126-
} else {
127-
promise.Produce(PayloadResult{Payload: payload})
128-
}
129-
}()
130-
return promise
120+
return containers.DoPromise(context.Background(), func(ctx context.Context) (PayloadResult, error) {
121+
payload, _, err := b.recoverInternal(ctx, batchBlockHash, sequencerMsg, true, false)
122+
return PayloadResult{Payload: payload}, err
123+
})
131124
}
132125

133126
// CollectPreimages collects preimages from the DA provider
@@ -136,14 +129,8 @@ func (b *readerForBlobReader) CollectPreimages(
136129
batchBlockHash common.Hash,
137130
sequencerMsg []byte,
138131
) containers.PromiseInterface[PreimagesResult] {
139-
promise, ctx := containers.NewPromiseWithContext[PreimagesResult](context.Background())
140-
go func() {
141-
_, preimages, err := b.recoverInternal(ctx, batchNum, batchBlockHash, sequencerMsg, false, true)
142-
if err != nil {
143-
promise.ProduceError(err)
144-
} else {
145-
promise.Produce(PreimagesResult{Preimages: preimages})
146-
}
147-
}()
148-
return promise
132+
return containers.DoPromise(context.Background(), func(ctx context.Context) (PreimagesResult, error) {
133+
_, preimages, err := b.recoverInternal(ctx, batchBlockHash, sequencerMsg, false, true)
134+
return PreimagesResult{Preimages: preimages}, err
135+
})
149136
}

daprovider/referenceda/reference_reader.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func (r *Reader) recoverInternal(
9797

9898
// Record preimages if needed
9999
var preimages daprovider.PreimagesMap
100-
if needPreimages && payload != nil {
100+
if needPreimages {
101101
preimages = make(daprovider.PreimagesMap)
102102
preimageRecorder := daprovider.RecordPreimagesTo(preimages)
103103

@@ -122,16 +122,10 @@ func (r *Reader) RecoverPayload(
122122
batchBlockHash common.Hash,
123123
sequencerMsg []byte,
124124
) containers.PromiseInterface[daprovider.PayloadResult] {
125-
promise, ctx := containers.NewPromiseWithContext[daprovider.PayloadResult](context.Background())
126-
go func() {
125+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PayloadResult, error) {
127126
payload, _, err := r.recoverInternal(ctx, batchNum, batchBlockHash, sequencerMsg, true, false)
128-
if err != nil {
129-
promise.ProduceError(err)
130-
} else {
131-
promise.Produce(daprovider.PayloadResult{Payload: payload})
132-
}
133-
}()
134-
return promise
127+
return daprovider.PayloadResult{Payload: payload}, err
128+
})
135129
}
136130

137131
// CollectPreimages collects preimages from the DA provider
@@ -140,14 +134,8 @@ func (r *Reader) CollectPreimages(
140134
batchBlockHash common.Hash,
141135
sequencerMsg []byte,
142136
) containers.PromiseInterface[daprovider.PreimagesResult] {
143-
promise, ctx := containers.NewPromiseWithContext[daprovider.PreimagesResult](context.Background())
144-
go func() {
137+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PreimagesResult, error) {
145138
_, preimages, err := r.recoverInternal(ctx, batchNum, batchBlockHash, sequencerMsg, false, true)
146-
if err != nil {
147-
promise.ProduceError(err)
148-
} else {
149-
promise.Produce(daprovider.PreimagesResult{Preimages: preimages})
150-
}
151-
}()
152-
return promise
139+
return daprovider.PreimagesResult{Preimages: preimages}, err
140+
})
153141
}

daprovider/referenceda/reference_validator.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,10 @@ func (v *Validator) generateReadPreimageProofInternal(ctx context.Context, certH
6767
// The proof enhancer will prepend the standardized header [certKeccak256, offset, certSize, certificate]
6868
// So we only need to return the custom data: [Version(1), PreimageSize(8), PreimageData]
6969
func (v *Validator) GenerateReadPreimageProof(certHash common.Hash, offset uint64, certificate []byte) containers.PromiseInterface[daprovider.PreimageProofResult] {
70-
promise, ctx := containers.NewPromiseWithContext[daprovider.PreimageProofResult](context.Background())
71-
go func() {
70+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.PreimageProofResult, error) {
7271
proof, err := v.generateReadPreimageProofInternal(ctx, certHash, offset, certificate)
73-
if err != nil {
74-
promise.ProduceError(err)
75-
} else {
76-
promise.Produce(daprovider.PreimageProofResult{Proof: proof})
77-
}
78-
}()
79-
return promise
72+
return daprovider.PreimageProofResult{Proof: proof}, err
73+
})
8074
}
8175

8276
// GenerateCertificateValidityProof creates a certificate validity proof for ReferenceDA
@@ -136,14 +130,8 @@ func (v *Validator) generateCertificateValidityProofInternal(ctx context.Context
136130
// Invalid certificates (wrong format, untrusted signer) return claimedValid=0.
137131
// Only transient errors (like RPC failures) return an error.
138132
func (v *Validator) GenerateCertificateValidityProof(certificate []byte) containers.PromiseInterface[daprovider.ValidityProofResult] {
139-
promise, ctx := containers.NewPromiseWithContext[daprovider.ValidityProofResult](context.Background())
140-
go func() {
133+
return containers.DoPromise(context.Background(), func(ctx context.Context) (daprovider.ValidityProofResult, error) {
141134
proof, err := v.generateCertificateValidityProofInternal(ctx, certificate)
142-
if err != nil {
143-
promise.ProduceError(err)
144-
} else {
145-
promise.Produce(daprovider.ValidityProofResult{Proof: proof})
146-
}
147-
}()
148-
return promise
135+
return daprovider.ValidityProofResult{Proof: proof}, err
136+
})
149137
}

util/containers/promise.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,29 @@ func (p *Promise[R]) Produce(value R) {
9898
}
9999
}
100100

101+
func (p *Promise[R]) ProduceResultSafe(value R, err error) error {
102+
if err == nil {
103+
return p.ProduceSafe(value)
104+
}
105+
return p.ProduceErrorSafe(err)
106+
}
107+
108+
func (p *Promise[R]) ProduceResult(value R, err error) {
109+
errSafe := p.ProduceResultSafe(value, err)
110+
if errSafe != nil {
111+
panic(errSafe)
112+
}
113+
}
114+
115+
func DoPromise[R any](parentCtx context.Context, promiseProducer func(context.Context) (R, error)) PromiseInterface[R] {
116+
promise, ctx := NewPromiseWithContext[R](parentCtx)
117+
go func() {
118+
value, err := promiseProducer(ctx)
119+
promise.ProduceResult(value, err)
120+
}()
121+
return promise
122+
}
123+
101124
// cancel might be called multiple times while no value or error is produced
102125
// cancel will be called by Await if it's context is done
103126
func NewPromise[R any](cancel func()) Promise[R] {
@@ -115,10 +138,6 @@ func NewPromiseWithContext[R any](parentCtx context.Context) (*Promise[R], conte
115138

116139
func NewReadyPromise[R any](val R, err error) PromiseInterface[R] {
117140
promise := NewPromise[R](nil)
118-
if err == nil {
119-
promise.Produce(val)
120-
} else {
121-
promise.ProduceError(err)
122-
}
141+
promise.ProduceResult(val, err)
123142
return &promise
124143
}

0 commit comments

Comments
 (0)