Skip to content

Conversation

@cosmic-chichu
Copy link
Contributor

@cosmic-chichu cosmic-chichu commented Dec 8, 2025

Summary

Enables propagation of SQS attributes through Numaflow pipelines, supporting FIFO queue semantics (MessageGroupId, MessageDeduplicationId) and user-defined message attributes.

Changes

  • SQS Source: Attribute extraction
    Renamed SqsMessage fields to system_attributes (SQS system attrs like SentTimestamp, MessageGroupId) and custom_attributes (user-defined message attributes). System attributes become message headers; custom attributes are stored in metadata.user_metadata["sqs"].

  • SQS Sink: FIFO queue support
    Added headers field to SqsSinkMessage. The sink reads MessageGroupId, MessageDeduplicationId, and DelaySeconds from headers and applies them to the SQS SendMessageBatch request.

  • numaflow-core integration
    Source maps system_attributesMessage.headers and custom_attributesMessage.metadata. Sink merges headers with metadata.user_metadata["sqs"], allowing UDFs/transformers to set or override FIFO attributes.

  • Improved error messages
    Changed Error::Sqs to store extracted error details instead of opaque AWS SDK error. Errors now display as "ErrorCode: Error message" instead of "unhandled error (CODE)".

  • Documentation
    Added sink documentation (docs/user-guide/sinks/sqs.md), updated source docs with attribute configuration examples, and added SQS example to message headers concept page.

Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
@codecov
Copy link

codecov bot commented Dec 8, 2025

Codecov Report

❌ Patch coverage is 97.38806% with 7 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.75%. Comparing base (7f28dc7) to head (70fd91e).

Files with missing lines Patch % Lines
rust/extns/numaflow-sqs/src/sink.rs 92.68% 3 Missing ⚠️
rust/extns/numaflow-sqs/src/lib.rs 88.88% 2 Missing ⚠️
rust/extns/numaflow-sqs/src/source.rs 90.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3095      +/-   ##
==========================================
+ Coverage   79.73%   79.75%   +0.01%     
==========================================
  Files         291      291              
  Lines       65143    65376     +233     
==========================================
+ Hits        51941    52138     +197     
- Misses      12649    12690      +41     
+ Partials      553      548       -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@cosmic-chichu cosmic-chichu marked this pull request as ready for review January 7, 2026 00:39
@cosmic-chichu cosmic-chichu changed the title feat: add control headers for sqs feat: SQS System Attributes and Custom Attributes Propagation Jan 7, 2026
Signed-off-by: Shrivardhan Rao <[email protected]>
In `pkg/sources/http/http.go`, HTTP headers from incoming requests are added to the message headers. This means any custom or standard HTTP header is available throughout the pipeline.

- **SQS Source Example**
When configured with `attributeNames`, SQS system attributes (e.g., `SentTimestamp`, `MessageGroupId`, `MessageDeduplicationId`) are copied into the Numaflow message headers. User-defined message attributes (configured via `messageAttributeNames`) are propagated as metadata under the `sqs` namespace.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following statement above.

Message headers are immutable and cannot be manipulated through the SDKs.

UDFs can read message headers to enrich processing logic, but they cannot modify or add new headers.

With this PR, it looks UDF can now add headers. Do we need to update the doc to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this PR enables adding custom key/values to the metadata name spaced under sqs. Headers are still immutable.


### Setting Headers via Metadata

User-defined functions can set SQS headers by writing to the `sqs` metadata namespace. These values are merged into the message headers at the sink, allowing UDFs to control FIFO ordering or set delay seconds.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it via SDK? Can you give an example on how to set?

if let Some(metadata) = &msg.metadata {
if let Some(sqs_meta) = metadata.user_metadata.get("sqs") {
for (k, v) in &sqs_meta.key_value {
if let Ok(val_str) = String::from_utf8(v.to_vec()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid UTF-8 will be silently ignored. Should we match and print warning for Err case?

})?;
.message_body(String::from_utf8_lossy(&message.message_body).to_string());

if let Some(delay) = message.headers.get("DelaySeconds") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid DelaySeconds are silently ignored, please match and handle error case. Do we have value validation on paved path end? I imagine DelaySeconds cannot be higher than certain threshold.

sqs_attrs.insert(k.clone(), val.clone().into_bytes());
}
}
custom_attributes.insert("sqs".to_string(), sqs_attrs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make sqs a const in sqs crate and use the const in numaflow-core?


/// Extracts a user-friendly error message from AWS SDK errors.
/// Returns format: "ErrorCode: Error message" or falls back to string representation.
pub fn extract_aws_error<E, R>(err: &aws_sdk_sqs::error::SdkError<E, R>) -> String
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn extract_aws_error<E, R>(err: &aws_sdk_sqs::error::SdkError<E, R>) -> String
pub(crate) fn extract_aws_error<E, R>(err: &aws_sdk_sqs::error::SdkError<E, R>) -> String

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add to mkdocs.yml

Comment on lines -48 to +50
#[error("Failed with SQS error - {0}")]
Sqs(#[from] aws_sdk_sqs::Error),
#[error("{0}")]
Sqs(String),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants