Skip to content

Sync worker reliability#694

Closed
neekolas wants to merge 2 commits intomainfrom
03-27-sync_worker_reliability
Closed

Sync worker reliability#694
neekolas wants to merge 2 commits intomainfrom
03-27-sync_worker_reliability

Conversation

@neekolas
Copy link
Copy Markdown
Contributor

@neekolas neekolas commented Apr 2, 2025

tl;dr

  • Moves a lot of the business logic into the originatorStream
  • Adds a write queue to separate validating messages and writing to the DB
  • Better handles retryable vs non-retryable errors. Anything that happens in the write queue is always retryable, anything that happens when processing a message from the stream is not.
  • Refactors some of our test helpers to make mocking the SyncWorker easier

Summary by CodeRabbit

  • New Features

    • Introduced a new package for mock generation, enhancing testing capabilities.
    • Added a new field for target rate per minute in the fee structure, improving fee calculation functionalities.
    • Created functions for node and mock registry creation, facilitating easier testing setups.
    • Added comprehensive unit tests for the sync worker to validate envelope processing and error handling.
  • Bug Fixes

    • Enhanced processing reliability by implementing an exponential backoff retry mechanism for envelope handling, improving error management and system stability.
  • Development Environment

    • Enabled language server support and specified formatting preferences for Go code in the development environment settings.

Copy link
Copy Markdown
Contributor Author

neekolas commented Apr 2, 2025


How to use the Graphite Merge Queue

Add either label to this PR to merge it via the merge queue:

  • Queue - adds this PR to the back of the merge queue
  • Hotfix - for urgent hot fixes, skip the queue and merge this PR next

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@neekolas neekolas marked this pull request as ready for review April 2, 2025 22:14
@neekolas neekolas requested a review from a team as a code owner April 2, 2025 22:14
@macroscopeapp
Copy link
Copy Markdown
Contributor

macroscopeapp Bot commented Apr 2, 2025

Extract originatorStream component from syncWorker to improve sync worker reliability

  • Extracts originatorStream functionality into a dedicated component in originatorStream.go with methods for stream handling, envelope validation, fee calculation, and storage
  • Refactors syncWorker in syncWorker.go to delegate stream handling to the new component
  • Adds unit tests for the originatorStream component in originatorStream_test.go
  • Introduces registry test utilities in registry.go for creating test nodes and mock registries
  • Adds mock generation configuration for ReplicationApi_SubscribeEnvelopesClient interface

📍Where to Start

Start with the newOriginatorStream constructor and listen method in originatorStream.go, which contains the core stream handling logic extracted from the sync worker.


Macroscope summarized a7d3c8f.

Comment thread pkg/testutils/registry/registry.go Outdated
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from d4ba441 to 58ffc34 Compare April 2, 2025 22:18
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 2, 2025

Warning

Rate limit exceeded

@neekolas has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 11 minutes and 52 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between b1b14b2 and a7d3c8f.

📒 Files selected for processing (8)
  • .mockery.yaml (1 hunks)
  • pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go (1 hunks)
  • pkg/server/server_test.go (3 hunks)
  • pkg/sync/originatorStream.go (1 hunks)
  • pkg/sync/originatorStream_test.go (1 hunks)
  • pkg/sync/syncWorker.go (2 hunks)
  • pkg/testutils/fees/rates.go (1 hunks)
  • pkg/testutils/registry/registry.go (1 hunks)
## Walkthrough
This pull request introduces several updates across the codebase. A new interface entry is added in the mock configuration and its corresponding mock implementation is provided for the Replication API. Test code is refactored to use helper functions for node and registry creation. In addition, the envelope processing in the sync worker is enhanced with an exponential backoff retry mechanism and updated error handling. New tests for the sync worker have been added, and test utilities for fee rates and registry operations have been expanded.

## Changes

| File(s) | Change Summary |
|---------|----------------|
| `.mockery.yaml`<br>`pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go` | Added a new package entry for the Replication API interface and implemented its corresponding mock with all required methods and helper functions. |
| `pkg/server/server_test.go` | Refactored test setup by replacing manual node initialization with utility functions (`CreateNode` and `CreateMockRegistry`) for cleaner registry mocking. |
| `pkg/sync/syncWorker.go`<br>`pkg/sync/syncWorker_test.go` | Introduced an exponential backoff retry mechanism in envelope processing, refined error handling (distinguishing transient from permanent errors), and added corresponding unit tests for various error scenarios. |
| `pkg/testutils/fees/rates.go` | Added a new `TargetRatePerMinute` field to test fee rates and introduced the `NewTestFeeCalculator` function to facilitate fee calculator creation based on these rates. |
| `pkg/testutils/registry/registry.go` | Added new helper functions (`CreateNode` and `CreateMockRegistry`) to simplify the creation of node instances and mock registries for testing purposes. |
| `.vscode/settings.json` | Moved the `"go.useLanguageServer": true` setting to a different position in the file without changing its value or adding new settings. |

## Suggested reviewers
- mkysel
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
pkg/testutils/registry/registry.go (1)

13-24: Consider adding unit tests for CreateNode.

While this helper function looks straightforward, it would be beneficial to have dedicated tests ensuring that each field is properly initialized and that edge cases (like negative ports, invalid keys, etc.) are handled correctly.

pkg/sync/syncWorker.go (1)

368-370: Consider adding a randomization factor and/or upper bound for backoff.

A 50ms initial interval might be too aggressive in some environments. Including jitter or a maximum wait time could help avoid repeated hammering if the error persists.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d39afa3 and 58ffc34.

📒 Files selected for processing (7)
  • .mockery.yaml (1 hunks)
  • pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go (1 hunks)
  • pkg/server/server_test.go (3 hunks)
  • pkg/sync/syncWorker.go (11 hunks)
  • pkg/sync/syncWorker_test.go (1 hunks)
  • pkg/testutils/fees/rates.go (1 hunks)
  • pkg/testutils/registry/registry.go (1 hunks)
🧰 Additional context used
🧬 Code Definitions (3)
pkg/sync/syncWorker.go (1)
pkg/envelopes/originator.go (2)
  • OriginatorEnvelope (13-16)
  • NewOriginatorEnvelope (18-32)
pkg/sync/syncWorker_test.go (3)
pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go (1)
  • NewMockReplicationApi_SubscribeEnvelopesClient (374-384)
pkg/testutils/fees/rates.go (1)
  • NewTestFeeCalculator (19-21)
pkg/testutils/registry/registry.go (1)
  • CreateNode (13-24)
pkg/server/server_test.go (2)
pkg/testutils/registry/registry.go (2)
  • CreateNode (13-24)
  • CreateMockRegistry (26-47)
pkg/registry/node.go (1)
  • Node (16-25)
🪛 GitHub Check: Lint-Go
pkg/sync/syncWorker_test.go

[failure] 4-4:
File is not properly formatted (gofumpt)

🪛 GitHub Actions: Lint
pkg/sync/syncWorker_test.go

[error] 1-1: File is not properly formatted (gofumpt).

🔇 Additional comments (16)
.mockery.yaml (1)

7-9: LGTM: New interface added for mock generation.

The addition of the ReplicationApi_SubscribeEnvelopesClient interface to the mockery configuration follows the existing pattern and enables testing of the sync worker functionality.

pkg/testutils/fees/rates.go (2)

9-12: LGTM: Added target rate parameter to test rates.

The addition of the TargetRatePerMinute field to the test rates configuration is properly implemented and the formatting changes improve readability.


19-21: LGTM: New test utility function for fee calculator.

This helper function simplifies the creation of fee calculators in tests by leveraging the existing test rates fetcher.

pkg/server/server_test.go (3)

23-23: LGTM: Added import for registry test utilities.

Adding the registry test utilities import prepares for the refactoring in this file.


86-92: Excellent refactoring to improve test readability and maintainability.

Using the helper functions CreateNode and CreateMockRegistry from the registry test utilities package reduces code duplication and improves maintainability. This makes the test code more concise while maintaining the same functionality.


191-192: LGTM: Consistent use of registry test utilities.

The refactoring is applied consistently throughout the file, improving overall code quality.

pkg/sync/syncWorker_test.go (6)

24-42: LGTM: Well-implemented mock for envelope subscription.

This helper function creates a mock that simulates a subscription with a single page of envelopes, which is useful for testing envelope processing in the sync worker.


44-57: LGTM: Good setup for minimal sync worker testing.

The function provides a clean way to create a minimally configured sync worker for testing purposes, with proper cleanup of resources.


59-67: LGTM: Useful utility for testing error handling.

Creating a broken DB connection allows for testing how the sync worker handles database failures.


69-87: LGTM: Good test for successful envelope processing.

This test properly verifies that the sync worker can successfully process envelopes and returns the expected retry error with correct timing.


89-111: Verify test expectations align with test name.

The test name TestSyncWorkerPermanentError suggests testing for a permanent error, but the test is asserting a RetryAfterError. Consider either renaming the test to reflect the actual expectation or updating the test to check for a permanent error if that's the intended behavior.


113-130: LGTM: Good test for retryable error handling.

This test correctly verifies that database connection issues result in retryable errors rather than permanent ones, ensuring the sync worker will attempt to retry operations when appropriate.

pkg/testutils/registry/registry.go (1)

34-42: Avoid reusing the same pointer for each loop iteration.

This is the same concern raised in a previous review: using &node inside the loop reuses the same address for every iteration. Create a copy of the node within the loop (e.g. nodeCopy := node) and return its address instead, so each mock call gets a unique pointer reference.

pkg/sync/syncWorker.go (2)

399-417: Verify if restarting the entire stream on error is desired.

Currently, a single non-permanent envelope error—exhausted after 5 retries—returns from the method, restarting the whole streaming process. This might lead to duplicated attempts or overhead if most envelopes are valid. Please confirm if this behavior is intentional.


595-595: Confirm that RecoverSigner() errors are always permanent.

Returning a permanent error here means we’ll never retry envelopes that fail signer recovery. If there’s a chance of transient issues (e.g., partial data corruption in transit), consider adjusting the error handling.

pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go (1)

1-385: Skipping review of auto-generated mock code.

Comment thread pkg/sync/syncWorker_test.go
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from 58ffc34 to 200b1fc Compare April 2, 2025 22:30
Comment thread .vscode/settings.json Outdated
Comment thread pkg/sync/syncWorker.go Outdated
Comment thread pkg/sync/syncWorker.go Outdated
@mkysel
Copy link
Copy Markdown
Collaborator

mkysel commented Apr 3, 2025

@neekolas can you give me a bit of context around this? What prompted this changeset. Did you see something broken? How does this fix the issue.

Copy link
Copy Markdown
Contributor Author

neekolas commented Apr 3, 2025

I was working on the misbehavior detection, and realized that any database error when saving the message will throw the message out permanently (bad on its own)...and can make it seem like there is a gap in sequence_id when in fact there isn't.

@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from 200b1fc to e50d718 Compare April 4, 2025 17:14
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
pkg/testutils/registry/registry.go (3)

13-22: Good implementation of the CreateNode function.

The function provides a clean way to create test nodes with sensible defaults. Consider enhancing flexibility by allowing optional customization of fields like InCanonicalNetwork and IsValidConfig through functional options or a config struct for more complex test scenarios.


24-45: Well-structured mock registry implementation with a few improvement opportunities.

The CreateMockRegistry function properly sets up mocks for registry testing. The use of Maybe() is good practice for flexible test scenarios.

Consider these improvements:

  1. Add documentation explaining when and how to properly clean up channel resources
  2. Consider returning a cleanup function to close all channels in one place
 func CreateMockRegistry(t *testing.T, nodes []r.Node) *registryMocks.MockNodeRegistry {
     mockRegistry := registryMocks.NewMockNodeRegistry(t)
     mockRegistry.On("GetNodes").Maybe().Return(nodes, nil)
 
     nodesChan := make(chan []r.Node)
     mockRegistry.On("OnNewNodes").Maybe().
         Return((<-chan []r.Node)(nodesChan), r.CancelSubscription(func() {}))
 
+    // Store channels for potential cleanup
+    nodeChans := make([]chan r.Node, 0, len(nodes))
     for _, node := range nodes {
         nodeChan := make(chan r.Node)
+        nodeChans = append(nodeChans, nodeChan)
         mockRegistry.On("OnChangedNode", node.NodeID).
             Maybe().
             Return((<-chan r.Node)(nodeChan), r.CancelSubscription(func() {
                 close(nodeChan)
             }))
         mockRegistry.On("GetNode", node.NodeID).Maybe().Return(&node, nil)
     }
 
     mockRegistry.On("Stop").Maybe().Return(nil)
 
     return mockRegistry
 }

29-30: Consider making OnNewNodes logic match OnChangedNode.

The OnChangedNode has logic to close the channel when subscription is cancelled, but OnNewNodes doesn't have similar cleanup logic.

 mockRegistry.On("OnNewNodes").Maybe().
-    Return((<-chan []r.Node)(nodesChan), r.CancelSubscription(func() {}))
+    Return((<-chan []r.Node)(nodesChan), r.CancelSubscription(func() {
+        close(nodesChan)
+    }))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 200b1fc and e50d718.

📒 Files selected for processing (8)
  • .mockery.yaml (1 hunks)
  • .vscode/settings.json (1 hunks)
  • pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go (1 hunks)
  • pkg/server/server_test.go (3 hunks)
  • pkg/sync/syncWorker.go (11 hunks)
  • pkg/sync/syncWorker_test.go (1 hunks)
  • pkg/testutils/fees/rates.go (1 hunks)
  • pkg/testutils/registry/registry.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
  • .vscode/settings.json
  • pkg/testutils/fees/rates.go
  • .mockery.yaml
  • pkg/sync/syncWorker.go
  • pkg/sync/syncWorker_test.go
  • pkg/server/server_test.go
  • pkg/mocks/message_api/mock_ReplicationApi_SubscribeEnvelopesClient.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Test (Node)
🔇 Additional comments (1)
pkg/testutils/registry/registry.go (1)

39-39: Node pointer usage in loop is correct with Go 1.22+.

In Go 1.22+, each loop iteration now gets a fresh variable, making this code correct. The past review comment about creating a local copy is no longer relevant with newer Go versions.

@neekolas neekolas requested a review from mkysel April 4, 2025 20:31
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from e50d718 to d2a4e52 Compare May 13, 2025 13:56
Comment thread pkg/sync/syncWorker.go Outdated
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch 4 times, most recently from f07a179 to 181791a Compare May 13, 2025 14:20
Comment thread pkg/sync/syncWorker.go Outdated
Comment thread pkg/sync/syncWorker.go Outdated
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch 2 times, most recently from a56b14b to bb995f8 Compare May 13, 2025 20:55
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from bb995f8 to 620e611 Compare May 13, 2025 20:57
Comment thread pkg/sync/originatorStream.go
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch 2 times, most recently from eb85ec2 to 64b918a Compare May 13, 2025 21:03
Comment thread pkg/sync/originatorStream.go Outdated
@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from 64b918a to 62afd30 Compare May 13, 2025 21:08
@mkysel mkysel self-requested a review May 13, 2025 21:10
Copy link
Copy Markdown
Collaborator

@mkysel mkysel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other than the shutdown I have no objections

@neekolas neekolas force-pushed the 03-27-sync_worker_reliability branch from 62afd30 to a7d3c8f Compare May 13, 2025 21:12
@graphite-app
Copy link
Copy Markdown

graphite-app Bot commented May 13, 2025

Merge activity

  • May 13, 2:18 PM PDT: neekolas added this pull request to the Graphite merge queue.
  • May 13, 2:18 PM PDT: CI is running for this pull request on a draft pull request (#792) due to your merge queue CI optimization settings.
  • May 13, 2:19 PM PDT: Merged by the Graphite merge queue via draft PR: #792.

graphite-app Bot pushed a commit that referenced this pull request May 13, 2025
## tl;dr

- Moves a lot of the business logic into the `originatorStream`
- Adds a write queue to separate validating messages and writing to the DB
- Better handles retryable vs non-retryable errors. Anything that happens in the write queue is always retryable, anything that happens when processing a message from the stream is not.
- Refactors some of our test helpers to make mocking the SyncWorker easier

<!-- This is an auto-generated comment: release notes by coderabbit.ai -->
## Summary by CodeRabbit

- **New Features**
  - Introduced a new package for mock generation, enhancing testing capabilities.
  - Added a new field for target rate per minute in the fee structure, improving fee calculation functionalities.
  - Created functions for node and mock registry creation, facilitating easier testing setups.
  - Added comprehensive unit tests for the sync worker to validate envelope processing and error handling.

- **Bug Fixes**
  - Enhanced processing reliability by implementing an exponential backoff retry mechanism for envelope handling, improving error management and system stability.

- **Development Environment**
  - Enabled language server support and specified formatting preferences for Go code in the development environment settings.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
@graphite-app graphite-app Bot closed this May 13, 2025
@graphite-app graphite-app Bot deleted the 03-27-sync_worker_reliability branch May 13, 2025 21:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants