x-pack/filebeat/input/awss3: add V2 input behind feature flag#51598
x-pack/filebeat/input/awss3: add V2 input behind feature flag#51598efd6 wants to merge 11 commits into
Conversation
…g and input skeleton Add a runtime feature flag (features.aws_s3_v2) that gates the V2 aws-s3 input implementation. When enabled, s3InputManager.Create delegates to the new inputV2 type instead of the legacy SQS/poller paths. The inputV2 skeleton satisfies the v2.Input interface but currently only blocks on context cancellation. Subsequent phases will wire object processing, discovery, and flow control. Assisted-By: Cursor
Add objectProcessorV2 which downloads, decodes, and emits events from S3 objects. It preserves the exact event field paths and @metadata._id formula from the legacy processor but decouples finalization (backup/delete) so it can be triggered post-ACK by the caller. Supports the full codec/content-type routing (ValueDecoder, Decoder, JSON, NDJSON, line-oriented text), file_selectors matching, expand_event_list_from_field, and transient download error wrapping. Assisted-By: Cursor
Introduce stateRegistryV2 which wraps the existing state registry behind a single creation path: capacity=0 gives unbounded (normal) state, capacity>0 gives lexicographical ordering with tail tracking. Both modes read persisted state in either legacy format on load. MarkProcessed and MarkFailed provide the V2 entry points for recording object disposition after ACK. Assisted-By: Cursor
🤖 GitHub commentsJust comment with:
|
🔍 Preview links for changed docs |
✅ Elastic Docs Style Checker (Vale)No issues found on modified lines! The Vale linter checks documentation changes against the Elastic Docs style guide. To use Vale locally or report issues, refer to Elastic style guide for Vale. |
|
Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services) |
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
|
Pinging @elastic/security-service-integrations (Team:Security-Service Integrations) |
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Enterprise Run ID: 📒 Files selected for processing (3)
✅ Files skipped from review due to trivial changes (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR adds a feature-flagged V2 Possibly related PRs
🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@x-pack/filebeat/input/awss3/v2_flow.go`:
- Around line 87-93: The concurrencyController.Acquire method is still relying
only on the semaphore capacity, so it ignores the current limit set by
OnBackpressure and can keep admitting work after a scale-down. Update Acquire to
enforce the controller’s active level when deciding whether to acquire a slot,
using the concurrencyController’s level state together with the existing
semaphore logic. Keep the fix localized to concurrencyController.Acquire and the
related level/backpressure handling so the current AIMD limit actually controls
admission.
In `@x-pack/filebeat/input/awss3/v2_input_test.go`:
- Line 112: The test is creating a logger with the disallowed helper, which
triggers the forbidigo lint rule in CI. Update the awss3 input test to use
logptest.NewTestingLogger(t, inputName) instead of logp.NewLogger(inputName)
when calling Plugin(...).Manager.Create, so the test uses the approved testing
logger helper and passes lint.
In `@x-pack/filebeat/input/awss3/v2_input.go`:
- Around line 280-289: The resolveSQSRegion helper in inputV2 is missing the
initialized SDK region fallback, which breaks cases where runSQS() receives
awsCfg with a valid Region from shared config/profile/env. Update
resolveSQSRegion(awsCfg awssdk.Config) to consult awsCfg.Region before returning
an empty string, while preserving the existing precedence of config.RegionName,
getRegionFromQueueURL(), and config.AWSConfig.DefaultRegion.
In `@x-pack/filebeat/input/awss3/v2_polling.go`:
- Around line 96-127: The polling flow in poll and worker can deadlock because
listObjects starts sending to the unbuffered workChan even when
createPipelineClient fails in a worker. Change the startup path so worker
initialization failures are reported back to poll before listing begins, and use
that signal to stop or cancel listObjects instead of letting it block. Keep the
fix localized around poll, worker, createPipelineClient, and listObjects by
adding failure propagation/cancellation and ensuring no object production starts
unless workers are ready.
In `@x-pack/filebeat/input/awss3/v2_sqs.go`:
- Around line 87-120: Keep the SQS keepalive context alive until the result is
fully done, not just until the method returns. In `v2_sqs.go`, remove the
immediate `defer keepCancel()` around the `keepalive` goroutine and make
`sqsResultV2.Done()` (or the equivalent finalization path) call `cancelKeep`
after pipeline ACKs complete so visibility is extended for the full processing
lifecycle. Use the existing `keepCancel`, `keepWg`, and `sqsResultV2` flow to
ensure the keepalive goroutine is stopped only after the message is truly
finished.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Enterprise
Run ID: 9c301acc-8d25-4b8a-aebb-54e4f8dbc31b
📒 Files selected for processing (19)
changelog/fragments/1782710570-51438-awss3.yamldocs/reference/filebeat/filebeat-input-aws-s3.mdlibbeat/features/features.gox-pack/filebeat/input/awss3/input.gox-pack/filebeat/input/awss3/input_integration_test.gox-pack/filebeat/input/awss3/v2_benchmark_test.gox-pack/filebeat/input/awss3/v2_flow.gox-pack/filebeat/input/awss3/v2_flow_test.gox-pack/filebeat/input/awss3/v2_input.gox-pack/filebeat/input/awss3/v2_input_integration_test.gox-pack/filebeat/input/awss3/v2_input_test.gox-pack/filebeat/input/awss3/v2_polling.gox-pack/filebeat/input/awss3/v2_polling_test.gox-pack/filebeat/input/awss3/v2_processor.gox-pack/filebeat/input/awss3/v2_processor_test.gox-pack/filebeat/input/awss3/v2_sqs.gox-pack/filebeat/input/awss3/v2_sqs_test.gox-pack/filebeat/input/awss3/v2_state.gox-pack/filebeat/input/awss3/v2_state_test.go
Implement the SQS receive loop and notification parsing for the V2 input. sqsDiscoveryV2 provides ReceiveLoop (long-poll dispatch), ProcessMessage (keepalive, parse, process each S3 record via objectProcessorV2), and Done (message disposition: delete on success or non-retryable error, return on retryable error). Notification format auto-detection covers S3 v2.2, SNS-wrapped, EventBridge, and custom goja scripts — matching the legacy behaviour. Assisted-By: Cursor
Implement pollingDiscoveryV2 which runs the periodic ListObjectsV2 loop for the V2 input. It paginates with circuit-breaker and equal-jitter backoff, applies filters via pollingStrategy and filterProvider, dispatches unprocessed objects to a worker pool, and runs backup/delete via objectProcessorV2.Finalize in ACK callbacks. Assisted-By: Cursor
Implement an AIMD concurrency controller that dynamically scales workers between 1 and number_of_workers based on pipeline backpressure. Config values become upper bounds rather than fixed pool sizes. The controller registers monitoring metrics (concurrency_level gauge, scale_ups/scale_downs counters) and logs at Info on each scaling decision. The cooldown between adjustments prevents oscillation. Assisted-By: Cursor
Connect the V2 input's Run method to all previously-built components: AWS config initialization, SQS/S3 API client creation, object processor, state registry, adaptive flow controller, and metrics. The SQS path sets up a receive loop with a concurrent worker pool gated by a semaphore, with backpressure feedback to the concurrency controller. The polling path resolves the bucket region, creates the state registry, and runs the poll loop. Region resolution follows the same priority as legacy: explicit config > queue URL detection > default_region. Assisted-By: Cursor
Add end-to-end integration tests that exercise the V2 input against localstack (SQS and S3 polling modes). Include a unit test verifying the feature flag routes Plugin.Create to the V2 implementation. Fix pre-existing build failures in input_integration_test.go caused by Plugin signature changes and a removed config field. Assisted-By: Cursor
|
|
||
| If an error occurs during the processing of the S3 object, the processing will be stopped, and the SQS message will be returned to the queue for reprocessing. | ||
|
|
||
| ::::{note} |
There was a problem hiding this comment.
| ::::{note} | |
| ::::{note} | |
| :applies_to: stack: ga 9.5+ | |
Update the aws-s3 input docs with a note about the V2 implementation and how to use it. Also note that it will become the default and the legacy code will be deprecated.
Add a benchmark for the v2 SQS path mirroring the existing legacy benchmark (input_benchmark_test.go). Uses the same mock SQS/S3/pipeline infrastructure to measure sustained object processing throughput across worker counts from 1 to 1024. Results show equivalent throughput at moderate concurrency and significantly better stability at high worker counts where legacy collapses due to goroutine contention. Assisted-By: Cursor
cmacknz
left a comment
There was a problem hiding this comment.
I read this and also had Claude go through, there are a couple of issues but the structure looks good. The concurrency controller seems simple to understand which is very nice.
| if err != nil { | ||
| p.log.Errorf("failed to create pipeline client: %v", err) | ||
| p.status.UpdateStatus(status.Degraded, fmt.Sprintf("Pipeline client setup failed: %s", err)) | ||
| cancel() |
There was a problem hiding this comment.
This cancel is shared across the entire poll, so if this fails for one worker, you can potentially also abort listObjects. Claude believes that listObjects treats context cancellation (via the AWS SDK) as a partially successfully run and not an error and you could call p.registry.CleanUp as a result which is probably not what you want.
| } | ||
|
|
||
| // Current returns the current allowed concurrency level. | ||
| func (cc *concurrencyController) Current() int { |
There was a problem hiding this comment.
This looks like it is only called from tests which doesn't seem right.
| defer metrics.endSQSWorker(id) | ||
|
|
||
| acks := newAWSACKHandler() | ||
| client, err := createPipelineClient(pipeline, acks) |
There was a problem hiding this comment.
Would it be better to pool pipeline clients instead of creating a new one per message?
| } | ||
| p.metrics.s3ObjectsAckedTotal.Inc() | ||
| if st.Stored { | ||
| if err := p.processor.Finalize(ctx, p.s3, evt); err != nil { |
There was a problem hiding this comment.
The wg.Wait() in poll() doesn't wait for the ack goroutine, so it looks like there is a possibility that the context is cancelled by the time you get here because pollCancel fired. This would at least cause the status to be degraded for a reason that users can't do anything about.
| // ProcessMessage parses an SQS message, processes each S3 object notification, | ||
| // and returns a result that must be finalized via Done() after all events are | ||
| // ACKed. | ||
| func (d *sqsDiscoveryV2) ProcessMessage(ctx context.Context, msg *types.Message, pub func(beat.Event)) sqsResultV2 { |
There was a problem hiding this comment.
s3EventsCreatedTotal seems like it isn't incremented anywhere in here while it is on the polling path.
Proposed commit message
Note
Best reviewed commit-wise and with reference to #51438.
Checklist
stresstest.shscript to run them under stress conditions and race detector to verify their stability../changelog/fragmentsusing the changelog tool.Disruptive User Impact
How to test this PR locally
Related issues
Use cases
Screenshots
Logs