Skip to content

x-pack/filebeat/input/awss3: add V2 input behind feature flag#51598

Open
efd6 wants to merge 11 commits into
elastic:mainfrom
efd6:51438-awss3
Open

x-pack/filebeat/input/awss3: add V2 input behind feature flag#51598
efd6 wants to merge 11 commits into
elastic:mainfrom
efd6:51438-awss3

Conversation

@efd6

@efd6 efd6 commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Proposed commit message

x-pack/filebeat/input/awss3: add V2 input behind feature flag

Rewrite the aws-s3 input orchestration layer as a drop-in replacement
gated behind features.aws_s3_v2 (default false). The v2 path fixes
silent data loss on SQS auth expiry, backup/delete in polling mode,
empty cloud.provider, and non-AWS region validation. It also replaces
the fixed worker pool with an adaptive AIMD concurrency controller
bounded by number_of_workers.

Both paths share existing infrastructure (API wrappers, notification
parsing, state registries, metrics, config types, scripting); only
the orchestration is rewritten. All external contracts are preserved:
config keys, event fields, @metadata._id format, persisted state.

Benchmarks show equivalent throughput at moderate concurrency and
significantly better stability at high worker counts where the legacy
implementation collapses.

Note

Best reviewed commit-wise and with reference to #51438.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability.
  • I have added an entry in ./changelog/fragments using the changelog tool.

Disruptive User Impact

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

efd6 added 3 commits June 29, 2026 14:41
…g and input skeleton

Add a runtime feature flag (features.aws_s3_v2) that gates the V2
aws-s3 input implementation. When enabled, s3InputManager.Create
delegates to the new inputV2 type instead of the legacy SQS/poller
paths.

The inputV2 skeleton satisfies the v2.Input interface but currently
only blocks on context cancellation. Subsequent phases will wire
object processing, discovery, and flow control.

Assisted-By: Cursor
Add objectProcessorV2 which downloads, decodes, and emits events from
S3 objects. It preserves the exact event field paths and @metadata._id
formula from the legacy processor but decouples finalization
(backup/delete) so it can be triggered post-ACK by the caller.

Supports the full codec/content-type routing (ValueDecoder, Decoder,
JSON, NDJSON, line-oriented text), file_selectors matching,
expand_event_list_from_field, and transient download error wrapping.

Assisted-By: Cursor
Introduce stateRegistryV2 which wraps the existing state registry
behind a single creation path: capacity=0 gives unbounded (normal)
state, capacity>0 gives lexicographical ordering with tail tracking.
Both modes read persisted state in either legacy format on load.

MarkProcessed and MarkFailed provide the V2 entry points for
recording object disposition after ACK.

Assisted-By: Cursor
@efd6 efd6 self-assigned this Jun 29, 2026
@efd6 efd6 added enhancement Filebeat Filebeat Team:obs-ds-hosted-services Label for the Observability Hosted Services team backport-skip Skip notification from the automated backport with mergify Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:Security-Service Integrations Security Service Integrations Team labels Jun 29, 2026
@botelastic botelastic Bot added needs_team Indicates that the issue/PR needs a Team:* label and removed needs_team Indicates that the issue/PR needs a Team:* label labels Jun 29, 2026
@github-actions

Copy link
Copy Markdown
Contributor

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)
  • /test : Run the Buildkite pipeline.

@github-actions

github-actions Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

🔍 Preview links for changed docs

@github-actions

Copy link
Copy Markdown
Contributor

✅ Elastic Docs Style Checker (Vale)

No issues found on modified lines!


The Vale linter checks documentation changes against the Elastic Docs style guide. To use Vale locally or report issues, refer to Elastic style guide for Vale.

@efd6 efd6 marked this pull request as ready for review June 29, 2026 07:58
@efd6 efd6 requested review from a team as code owners June 29, 2026 07:58
@efd6 efd6 requested review from andrzej-stencel and mauri870 June 29, 2026 07:58
@infra-vault-gh-plugin-prod

Copy link
Copy Markdown

Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services)

@infra-vault-gh-plugin-prod

Copy link
Copy Markdown

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@infra-vault-gh-plugin-prod

Copy link
Copy Markdown

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

@coderabbitai

coderabbitai Bot commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: ce40b7b6-2046-4a14-820f-b4e89feb3347

📥 Commits

Reviewing files that changed from the base of the PR and between eeb2042 and 0d3b077.

📒 Files selected for processing (3)
  • changelog/fragments/1782710570-51438-awss3.yaml
  • docs/reference/filebeat/filebeat-input-aws-s3.md
  • x-pack/filebeat/input/awss3/v2_benchmark_test.go
✅ Files skipped from review due to trivial changes (2)
  • changelog/fragments/1782710570-51438-awss3.yaml
  • docs/reference/filebeat/filebeat-input-aws-s3.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • x-pack/filebeat/input/awss3/v2_benchmark_test.go

📝 Walkthrough

Walkthrough

This PR adds a feature-flagged V2 aws-s3 input for Filebeat. It introduces a new libbeat feature flag, routes plugin creation to inputV2 when enabled, and adds separate SQS and polling execution paths. The V2 implementation includes adaptive concurrency control, S3 object processing, state persistence, new tests, a benchmark, and documentation/changelog updates.

Possibly related PRs

  • elastic/beats#51439: Shares the same aws-s3 V2 code path and covers closely related contracts around event fields, notification parsing, state handling, and feature-flag routing.
🚥 Pre-merge checks | ✅ 2
✅ Passed checks (2 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • 🛠️ Update Documentation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@x-pack/filebeat/input/awss3/v2_flow.go`:
- Around line 87-93: The concurrencyController.Acquire method is still relying
only on the semaphore capacity, so it ignores the current limit set by
OnBackpressure and can keep admitting work after a scale-down. Update Acquire to
enforce the controller’s active level when deciding whether to acquire a slot,
using the concurrencyController’s level state together with the existing
semaphore logic. Keep the fix localized to concurrencyController.Acquire and the
related level/backpressure handling so the current AIMD limit actually controls
admission.

In `@x-pack/filebeat/input/awss3/v2_input_test.go`:
- Line 112: The test is creating a logger with the disallowed helper, which
triggers the forbidigo lint rule in CI. Update the awss3 input test to use
logptest.NewTestingLogger(t, inputName) instead of logp.NewLogger(inputName)
when calling Plugin(...).Manager.Create, so the test uses the approved testing
logger helper and passes lint.

In `@x-pack/filebeat/input/awss3/v2_input.go`:
- Around line 280-289: The resolveSQSRegion helper in inputV2 is missing the
initialized SDK region fallback, which breaks cases where runSQS() receives
awsCfg with a valid Region from shared config/profile/env. Update
resolveSQSRegion(awsCfg awssdk.Config) to consult awsCfg.Region before returning
an empty string, while preserving the existing precedence of config.RegionName,
getRegionFromQueueURL(), and config.AWSConfig.DefaultRegion.

In `@x-pack/filebeat/input/awss3/v2_polling.go`:
- Around line 96-127: The polling flow in poll and worker can deadlock because
listObjects starts sending to the unbuffered workChan even when
createPipelineClient fails in a worker. Change the startup path so worker
initialization failures are reported back to poll before listing begins, and use
that signal to stop or cancel listObjects instead of letting it block. Keep the
fix localized around poll, worker, createPipelineClient, and listObjects by
adding failure propagation/cancellation and ensuring no object production starts
unless workers are ready.

In `@x-pack/filebeat/input/awss3/v2_sqs.go`:
- Around line 87-120: Keep the SQS keepalive context alive until the result is
fully done, not just until the method returns. In `v2_sqs.go`, remove the
immediate `defer keepCancel()` around the `keepalive` goroutine and make
`sqsResultV2.Done()` (or the equivalent finalization path) call `cancelKeep`
after pipeline ACKs complete so visibility is extended for the full processing
lifecycle. Use the existing `keepCancel`, `keepWg`, and `sqsResultV2` flow to
ensure the keepalive goroutine is stopped only after the message is truly
finished.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Enterprise

Run ID: 9c301acc-8d25-4b8a-aebb-54e4f8dbc31b

📥 Commits

Reviewing files that changed from the base of the PR and between 01ce4ab and 5743e6a.

📒 Files selected for processing (19)
  • changelog/fragments/1782710570-51438-awss3.yaml
  • docs/reference/filebeat/filebeat-input-aws-s3.md
  • libbeat/features/features.go
  • x-pack/filebeat/input/awss3/input.go
  • x-pack/filebeat/input/awss3/input_integration_test.go
  • x-pack/filebeat/input/awss3/v2_benchmark_test.go
  • x-pack/filebeat/input/awss3/v2_flow.go
  • x-pack/filebeat/input/awss3/v2_flow_test.go
  • x-pack/filebeat/input/awss3/v2_input.go
  • x-pack/filebeat/input/awss3/v2_input_integration_test.go
  • x-pack/filebeat/input/awss3/v2_input_test.go
  • x-pack/filebeat/input/awss3/v2_polling.go
  • x-pack/filebeat/input/awss3/v2_polling_test.go
  • x-pack/filebeat/input/awss3/v2_processor.go
  • x-pack/filebeat/input/awss3/v2_processor_test.go
  • x-pack/filebeat/input/awss3/v2_sqs.go
  • x-pack/filebeat/input/awss3/v2_sqs_test.go
  • x-pack/filebeat/input/awss3/v2_state.go
  • x-pack/filebeat/input/awss3/v2_state_test.go

Comment thread x-pack/filebeat/input/awss3/v2_flow.go Outdated
Comment thread x-pack/filebeat/input/awss3/v2_input_test.go Outdated
Comment thread x-pack/filebeat/input/awss3/v2_input.go
Comment thread x-pack/filebeat/input/awss3/v2_polling.go
Comment thread x-pack/filebeat/input/awss3/v2_sqs.go
@efd6 efd6 marked this pull request as draft June 29, 2026 08:36
efd6 added 2 commits June 29, 2026 20:04
Implement the SQS receive loop and notification parsing for the V2
input. sqsDiscoveryV2 provides ReceiveLoop (long-poll dispatch),
ProcessMessage (keepalive, parse, process each S3 record via
objectProcessorV2), and Done (message disposition: delete on success
or non-retryable error, return on retryable error).

Notification format auto-detection covers S3 v2.2, SNS-wrapped,
EventBridge, and custom goja scripts — matching the legacy behaviour.

Assisted-By: Cursor
Implement pollingDiscoveryV2 which runs the periodic ListObjectsV2
loop for the V2 input. It paginates with circuit-breaker and
equal-jitter backoff, applies filters via pollingStrategy and
filterProvider, dispatches unprocessed objects to a worker pool, and
runs backup/delete via objectProcessorV2.Finalize in ACK callbacks.

Assisted-By: Cursor
efd6 added 3 commits June 29, 2026 20:20
Implement an AIMD concurrency controller that dynamically scales
workers between 1 and number_of_workers based on pipeline
backpressure. Config values become upper bounds rather than fixed
pool sizes.

The controller registers monitoring metrics (concurrency_level gauge,
scale_ups/scale_downs counters) and logs at Info on each scaling
decision. The cooldown between adjustments prevents oscillation.

Assisted-By: Cursor
Connect the V2 input's Run method to all previously-built components:
AWS config initialization, SQS/S3 API client creation, object
processor, state registry, adaptive flow controller, and metrics.

The SQS path sets up a receive loop with a concurrent worker pool
gated by a semaphore, with backpressure feedback to the concurrency
controller. The polling path resolves the bucket region, creates the
state registry, and runs the poll loop.

Region resolution follows the same priority as legacy: explicit
config > queue URL detection > default_region.

Assisted-By: Cursor
Add end-to-end integration tests that exercise the V2 input against
localstack (SQS and S3 polling modes). Include a unit test verifying
the feature flag routes Plugin.Create to the V2 implementation.

Fix pre-existing build failures in input_integration_test.go caused
by Plugin signature changes and a removed config field.

Assisted-By: Cursor

If an error occurs during the processing of the S3 object, the processing will be stopped, and the SQS message will be returned to the queue for reprocessing.

::::{note}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
::::{note}
::::{note}
:applies_to: stack: ga 9.5+

efd6 added 3 commits July 3, 2026 08:24
Update the aws-s3 input docs with a note about the V2 implementation and how
to use it. Also note that it will become the default and the legacy code will
be deprecated.
Add a benchmark for the v2 SQS path mirroring the existing legacy
benchmark (input_benchmark_test.go). Uses the same mock SQS/S3/pipeline
infrastructure to measure sustained object processing throughput across
worker counts from 1 to 1024.

Results show equivalent throughput at moderate concurrency and
significantly better stability at high worker counts where legacy
collapses due to goroutine contention.

Assisted-By: Cursor

@cmacknz cmacknz left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this and also had Claude go through, there are a couple of issues but the structure looks good. The concurrency controller seems simple to understand which is very nice.

if err != nil {
p.log.Errorf("failed to create pipeline client: %v", err)
p.status.UpdateStatus(status.Degraded, fmt.Sprintf("Pipeline client setup failed: %s", err))
cancel()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cancel is shared across the entire poll, so if this fails for one worker, you can potentially also abort listObjects. Claude believes that listObjects treats context cancellation (via the AWS SDK) as a partially successfully run and not an error and you could call p.registry.CleanUp as a result which is probably not what you want.

}

// Current returns the current allowed concurrency level.
func (cc *concurrencyController) Current() int {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like it is only called from tests which doesn't seem right.

defer metrics.endSQSWorker(id)

acks := newAWSACKHandler()
client, err := createPipelineClient(pipeline, acks)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to pool pipeline clients instead of creating a new one per message?

}
p.metrics.s3ObjectsAckedTotal.Inc()
if st.Stored {
if err := p.processor.Finalize(ctx, p.s3, evt); err != nil {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wg.Wait() in poll() doesn't wait for the ack goroutine, so it looks like there is a possibility that the context is cancelled by the time you get here because pollCancel fired. This would at least cause the status to be degraded for a reason that users can't do anything about.

// ProcessMessage parses an SQS message, processes each S3 object notification,
// and returns a result that must be finalized via Done() after all events are
// ACKed.
func (d *sqsDiscoveryV2) ProcessMessage(ctx context.Context, msg *types.Message, pub func(beat.Event)) sqsResultV2 {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3EventsCreatedTotal seems like it isn't incremented anywhere in here while it is on the polling path.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip notification from the automated backport with mergify enhancement Filebeat Filebeat Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team Team:obs-ds-hosted-services Label for the Observability Hosted Services team Team:Security-Service Integrations Security Service Integrations Team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants