[Access] Properly handle existing tx error messages during ingestion#8478
[Access] Properly handle existing tx error messages during ingestion#8478peterargue wants to merge 5 commits intomasterfrom
Conversation
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
📝 WalkthroughWalkthroughAdds idempotent handling for storing transaction error messages (treating ErrAlreadyExists as success), changes FetchErrorMessagesByENs to accept execution nodes, introduces infinite exponential-backoff retry with retryable/non-retryable classification for processing finalized blocks, and adds prefix-based existence checks plus tests. Changes
Sequence DiagramsequenceDiagram
participant Engine as tx_error_messages_engine
participant EN as Execution_Node
participant Storage as Storage
participant Backoff as Retry_Loop
Engine->>EN: GetTransactionErrorMessagesByBlockID(blockID)
EN-->>Engine: (result) / (error)
Engine->>Engine: isRetryableError(err)?
alt retryable
Engine->>Backoff: schedule exponential backoff + jitter
loop until success
Backoff->>EN: Retry GetTransactionErrorMessagesByBlockID
EN-->>Backoff: (result) / (error)
Backoff->>Engine: log attempt (WARN every 20)
end
else non-retryable
Engine->>Engine: ctx.Throw(err) -> escalate and stop
end
Engine->>Storage: InsertAndIndexTransactionResultErrorMessages(messages)
Storage-->>Engine: ErrAlreadyExists
Engine->>Engine: treat as success (idempotent) / return nil
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go`:
- Around line 219-224: The current logic in tx_error_messages_engine.go treats
any gRPC status.FromError(err) as retryable; change it to inspect the gRPC code
and only return true for transient codes (codes.Unavailable,
codes.ResourceExhausted, codes.Aborted) and optionally codes.DeadlineExceeded if
following requester.go backoff rules, while returning false for permanent codes
(codes.InvalidArgument, codes.PermissionDenied, codes.NotFound,
codes.Unauthenticated). Replace the unconditional status.FromError(err) check
with code := status.Code(err) and compare against the allowed retry set, keeping
the existing special-case check for commonrpc.ErrNoENsFoundForExecutionResult
unchanged.
- Around line 144-147: The current error path escalates context cancellation
from processErrorMessagesForBlock via ctx.Throw; change it to treat
context.Canceled and context.DeadlineExceeded as normal shutdown signals by
detecting them (errors.Is(err, context.Canceled) || errors.Is(err,
context.DeadlineExceeded) or checking ctx.Err()) and returning without calling
ctx.Throw, while preserving ctx.Throw for real errors; update the block that
calls processErrorMessagesForBlock (the code that currently does
ctx.Throw(fmt.Errorf("failed to process transaction result error messages for
block: %w", err))) to only ctx.Throw on non-cancellation errors and simply
return (or log at debug) on cancellation so the component's lifecycle can shut
down gracefully.
In `@storage/operation/reads.go`:
- Around line 133-137: The PrefixExists function must reject empty prefixes to
avoid false positives; add an explicit check at the start of PrefixExists
(before calling r.NewIter) that if len(prefix) == 0 it returns (false, nil) or a
context-appropriate error (choose false,nil if callers expect a boolean-only
existence result); update PrefixExists to perform this guard and document the
behavior so callers know empty prefixes are treated as non-existent.
ℹ️ Review info
Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e06d56f6-8e04-48b7-b74d-64a17bba2f1e
📒 Files selected for processing (8)
engine/access/ingestion/tx_error_messages/tx_error_messages_core.goengine/access/ingestion/tx_error_messages/tx_error_messages_core_test.goengine/access/ingestion/tx_error_messages/tx_error_messages_engine.goengine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.gostorage/operation/reads.gostorage/operation/reads_test.gostorage/operation/transaction_results.gostorage/operation/transaction_results_test.go
There was a problem hiding this comment.
♻️ Duplicate comments (2)
engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go (2)
11-11:⚠️ Potential issue | 🟠 Major
isRetryableErrorcurrently retries permanent gRPC failures indefinitelyLine 221 treats any gRPC status error as retryable. With the indefinite backoff loop, permanent codes (e.g.,
InvalidArgument,PermissionDenied,NotFound,Unauthenticated) can block progress forever.Suggested fix
import ( "context" "errors" "fmt" "time" "github.com/rs/zerolog" "github.com/sethvargo/go-retry" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ func isRetryableError(err error) bool { - if _, ok := status.FromError(err); ok { - return true + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.Unavailable, codes.ResourceExhausted, codes.Aborted: + return true + default: + return false + } } if errors.Is(err, commonrpc.ErrNoENsFoundForExecutionResult) { return true } return false }#!/bin/bash # Verify current gRPC retry classification patterns and compare with this engine. rg -n --type=go -C3 '\bstatus\.(FromError|Code)\s*\(' engine/access rg -n --type=go -C3 '\bcodes\.(Unavailable|ResourceExhausted|Aborted|DeadlineExceeded|InvalidArgument|PermissionDenied|NotFound|Unauthenticated)\b' engine/accessBased on learnings, "Treat all inputs as potentially byzantine and classify errors in a context-dependent manner; no code path is safe unless explicitly proven and documented".
Also applies to: 220-223
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go` at line 11, The isRetryableError function currently treats any gRPC status error as retryable; change it to only return true for transient gRPC codes (e.g., codes.Unavailable, codes.ResourceExhausted, codes.Aborted, codes.DeadlineExceeded and optionally codes.Internal if you consider retries safe) and return false for permanent codes (codes.InvalidArgument, codes.PermissionDenied, codes.NotFound, codes.Unauthenticated). Implement this by using status.FromError(err) / status.Code(err) inside isRetryableError and switch on the returned codes, ensuring non-gRPC errors are handled appropriately (return false or preserve existing behavior); update any related logic in the same file (around the isRetryableError usage at the 220-223 region) and add/update unit tests to cover both transient and permanent gRPC codes.
144-147:⚠️ Potential issue | 🟠 MajorHandle shutdown cancellation as non-fatal in this path
Line 145 currently throws irrecoverably for any error.
context.Canceledandcontext.DeadlineExceededare expected during component shutdown and should return quietly instead of callingctx.Throw.Suggested fix
err = e.processErrorMessagesForBlock(ctx, header.ID()) if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return + } ctx.Throw(fmt.Errorf("failed to process transaction result error messages for block: %w", err)) return }As per coding guidelines, "Implement the
Componentinterface from/module/component/component.gofor all major processing components to ensure consistent lifecycle management and graceful shutdown patterns".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go` around lines 144 - 147, The current error path always calls ctx.Throw when err != nil; update it to treat shutdown cancellations as non-fatal by checking if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) and simply return (no ctx.Throw) in those cases; for all other errors keep the existing ctx.Throw(fmt.Errorf("failed to process transaction result error messages for block: %w", err)). Ensure you import the standard "errors" and "context" packages if not already used and apply this change around the block that invokes ctx.Throw (the code handling transaction result error messages where err is checked).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go`:
- Line 11: The isRetryableError function currently treats any gRPC status error
as retryable; change it to only return true for transient gRPC codes (e.g.,
codes.Unavailable, codes.ResourceExhausted, codes.Aborted,
codes.DeadlineExceeded and optionally codes.Internal if you consider retries
safe) and return false for permanent codes (codes.InvalidArgument,
codes.PermissionDenied, codes.NotFound, codes.Unauthenticated). Implement this
by using status.FromError(err) / status.Code(err) inside isRetryableError and
switch on the returned codes, ensuring non-gRPC errors are handled appropriately
(return false or preserve existing behavior); update any related logic in the
same file (around the isRetryableError usage at the 220-223 region) and
add/update unit tests to cover both transient and permanent gRPC codes.
- Around line 144-147: The current error path always calls ctx.Throw when err !=
nil; update it to treat shutdown cancellations as non-fatal by checking if
errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) and
simply return (no ctx.Throw) in those cases; for all other errors keep the
existing ctx.Throw(fmt.Errorf("failed to process transaction result error
messages for block: %w", err)). Ensure you import the standard "errors" and
"context" packages if not already used and apply this change around the block
that invokes ctx.Throw (the code handling transaction result error messages
where err is checked).
ℹ️ Review info
Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c70f73fc-33a4-4624-b1ec-0dea0887cbf4
📒 Files selected for processing (1)
engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go
There was a problem hiding this comment.
🧹 Nitpick comments (3)
engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go (1)
145-147: Useirrecoverablewrappers on non-retryable failure paths.These paths represent exception-grade failures; use
irrecoverable.NewExceptionfinstead offmt.Errorfbefore propagation.♻️ Proposed fix
if err != nil { if !errors.Is(err, context.Canceled) { - ctx.Throw(fmt.Errorf("failed to process transaction result error messages for block: %w", err)) + ctx.Throw(irrecoverable.NewExceptionf("failed to process transaction result error messages for block: %w", err)) } return } @@ if !isRetryableError(err) { - return fmt.Errorf("failed to fetch transaction result error messages: %w", err) + return irrecoverable.NewExceptionf("failed to fetch transaction result error messages: %w", err) }As per coding guidelines, "Use the
irrecoverablepackage for exception handling instead offmt.Errorf; always explicitly handle errors and never log and continue on a best-effort basis".Also applies to: 202-204
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go` around lines 145 - 147, The non-retryable failure path currently wraps errors using fmt.Errorf before calling ctx.Throw; replace fmt.Errorf with irrecoverable.NewExceptionf to create an irrecoverable exception (e.g., in the block checking !errors.Is(err, context.Canceled) where ctx.Throw(fmt.Errorf(...)) is used). Update the same pattern for the other occurrences noted (the similar ctx.Throw usage around lines 202-204) so all exception-grade failures use irrecoverable.NewExceptionf instead of fmt.Errorf and then propagate via ctx.Throw.storage/operation/reads.go (1)
138-141: Useirrecoverable.NewExceptionffor iterator-construction failures.The iterator creation failure is an exceptional storage-path error; please wrap it with
irrecoverablefor consistency with project error-classification conventions.♻️ Proposed fix
iter, err := r.NewIter(prefix, prefix, opt) if err != nil { - return false, fmt.Errorf("can not create iterator: %w", err) + return false, irrecoverable.NewExceptionf("can not create iterator: %w", err) }As per coding guidelines, "Use the
irrecoverablepackage for exception handling instead offmt.Errorf; always explicitly handle errors and never log and continue on a best-effort basis".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@storage/operation/reads.go` around lines 138 - 141, The iterator construction error returned from r.NewIter(...) is an exceptional storage-path failure and must be wrapped with irrecoverable.NewExceptionf rather than fmt.Errorf; update the error return in the block handling iter, err := r.NewIter(prefix, prefix, opt) to return irrecoverable.NewExceptionf("can not create iterator: %w", err) (and add/import the irrecoverable package if missing) so the function uses the project's irrecoverable error classification consistently.engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go (1)
269-319: Make the cancellation test explicitly fail ifctx.Throwis invoked.Right now it asserts clean shutdown, but not explicitly the absence of throw calls. Capturing thrown errors makes the “NoThrow” guarantee strict.
♻️ Proposed refactor
func (s *TxErrorMessagesEngineSuite) TestOnFinalizedBlock_ContextCancelled_NoThrow() { - irrecoverableCtx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background()) + baseCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + throwErrCh := make(chan error, 1) + irrecoverableCtx := irrecoverable.NewMockSignalerContextWithCallback(s.T(), baseCtx, func(err error) { + select { + case throwErrCh <- err: + default: + } + }) s.connFactory.On("GetExecutionAPIClient", mock.Anything).Return(s.execClient, &mockCloser{}, nil) @@ // Verify the engine shuts down cleanly. unittest.RequireCloseBefore(s.T(), eng.Done(), 2*time.Second, "expected engine to stop before timeout") + + select { + case err := <-throwErrCh: + s.T().Fatalf("expected no ctx.Throw on cancellation, got: %v", err) + default: + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go` around lines 269 - 319, The test must explicitly fail if the irrecoverable context's Throw is invoked: add a mock expectation on the mock returned by irrecoverable.NewMockSignalerContextWithCancel (irrecoverableCtx) that any call to Throw triggers a test failure (call s.T().Fatalf or similar) so the test enforces the "no throw" guarantee; set this expectation before initEngine/processing begins (near where irrecoverableCtx, cancel are created) and keep the existing cancellation and RequireCloseBefore/eng.Done() checks.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.go`:
- Around line 269-319: The test must explicitly fail if the irrecoverable
context's Throw is invoked: add a mock expectation on the mock returned by
irrecoverable.NewMockSignalerContextWithCancel (irrecoverableCtx) that any call
to Throw triggers a test failure (call s.T().Fatalf or similar) so the test
enforces the "no throw" guarantee; set this expectation before
initEngine/processing begins (near where irrecoverableCtx, cancel are created)
and keep the existing cancellation and RequireCloseBefore/eng.Done() checks.
In `@engine/access/ingestion/tx_error_messages/tx_error_messages_engine.go`:
- Around line 145-147: The non-retryable failure path currently wraps errors
using fmt.Errorf before calling ctx.Throw; replace fmt.Errorf with
irrecoverable.NewExceptionf to create an irrecoverable exception (e.g., in the
block checking !errors.Is(err, context.Canceled) where
ctx.Throw(fmt.Errorf(...)) is used). Update the same pattern for the other
occurrences noted (the similar ctx.Throw usage around lines 202-204) so all
exception-grade failures use irrecoverable.NewExceptionf instead of fmt.Errorf
and then propagate via ctx.Throw.
In `@storage/operation/reads.go`:
- Around line 138-141: The iterator construction error returned from
r.NewIter(...) is an exceptional storage-path failure and must be wrapped with
irrecoverable.NewExceptionf rather than fmt.Errorf; update the error return in
the block handling iter, err := r.NewIter(prefix, prefix, opt) to return
irrecoverable.NewExceptionf("can not create iterator: %w", err) (and add/import
the irrecoverable package if missing) so the function uses the project's
irrecoverable error classification consistently.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cf247f1a-cd9f-4034-806c-38234a13be3f
📒 Files selected for processing (5)
engine/access/ingestion/tx_error_messages/tx_error_messages_engine.goengine/access/ingestion/tx_error_messages/tx_error_messages_engine_test.goengine/common/rpc/execution_node_identities_provider.gostorage/operation/reads.gostorage/operation/reads_test.go
✅ Files skipped from review due to trivial changes (1)
- engine/common/rpc/execution_node_identities_provider.go
Transaction error messages are stored in the db as soon as possible during ingestion. This can happen as part of the execution receipt processing. There is also a jobqueue that ensures the lowest ingested height progresses.
There was a regression where if a key exists in the DB, the
Exists()function would return false because of an incorrect db lookup, then the storage logic later didn't handle thestorage.ErrAlreadyExistserror.This PR fixes 3 issues:
storage.TransactionResultErrorMessages.Exists()returned false for existing keys due to an incorrect checktx_error_messages.TxErrorMessagesCore.FetchErrorMessagesByENs()returned an error when the data already existed in the dbtx_error_messages.Engineimproperly handle errors, swallowing the db storage errors.Summary by CodeRabbit
Bug Fixes
New Features
Tests