-
Notifications
You must be signed in to change notification settings - Fork 149
feat: SQS System Attributes and Custom Attributes Propagation #3095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3095 +/- ##
==========================================
+ Coverage 79.73% 79.75% +0.01%
==========================================
Files 291 291
Lines 65143 65376 +233
==========================================
+ Hits 51941 52138 +197
- Misses 12649 12690 +41
+ Partials 553 548 -5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Shrivardhan Rao <[email protected]>
…aproj/numaflow into dev-nightly-feat/sqs-headers
Signed-off-by: Shrivardhan Rao <[email protected]>
…aproj/numaflow into dev-nightly-feat/sqs-headers
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
Signed-off-by: Shrivardhan Rao <[email protected]>
| In `pkg/sources/http/http.go`, HTTP headers from incoming requests are added to the message headers. This means any custom or standard HTTP header is available throughout the pipeline. | ||
|
|
||
| - **SQS Source Example** | ||
| When configured with `attributeNames`, SQS system attributes (e.g., `SentTimestamp`, `MessageGroupId`, `MessageDeduplicationId`) are copied into the Numaflow message headers. User-defined message attributes (configured via `messageAttributeNames`) are propagated as metadata under the `sqs` namespace. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the following statement above.
Message headers are immutable and cannot be manipulated through the SDKs.
UDFs can read message headers to enrich processing logic, but they cannot modify or add new headers.
With this PR, it looks UDF can now add headers. Do we need to update the doc to be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this PR enables adding custom key/values to the metadata name spaced under sqs. Headers are still immutable.
|
|
||
| ### Setting Headers via Metadata | ||
|
|
||
| User-defined functions can set SQS headers by writing to the `sqs` metadata namespace. These values are merged into the message headers at the sink, allowing UDFs to control FIFO ordering or set delay seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it via SDK? Can you give an example on how to set?
| if let Some(metadata) = &msg.metadata { | ||
| if let Some(sqs_meta) = metadata.user_metadata.get("sqs") { | ||
| for (k, v) in &sqs_meta.key_value { | ||
| if let Ok(val_str) = String::from_utf8(v.to_vec()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid UTF-8 will be silently ignored. Should we match and print warning for Err case?
| })?; | ||
| .message_body(String::from_utf8_lossy(&message.message_body).to_string()); | ||
|
|
||
| if let Some(delay) = message.headers.get("DelaySeconds") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid DelaySeconds are silently ignored, please match and handle error case. Do we have value validation on paved path end? I imagine DelaySeconds cannot be higher than certain threshold.
| sqs_attrs.insert(k.clone(), val.clone().into_bytes()); | ||
| } | ||
| } | ||
| custom_attributes.insert("sqs".to_string(), sqs_attrs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make sqs a const in sqs crate and use the const in numaflow-core?
|
|
||
| /// Extracts a user-friendly error message from AWS SDK errors. | ||
| /// Returns format: "ErrorCode: Error message" or falls back to string representation. | ||
| pub fn extract_aws_error<E, R>(err: &aws_sdk_sqs::error::SdkError<E, R>) -> String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pub fn extract_aws_error<E, R>(err: &aws_sdk_sqs::error::SdkError<E, R>) -> String | |
| pub(crate) fn extract_aws_error<E, R>(err: &aws_sdk_sqs::error::SdkError<E, R>) -> String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add to mkdocs.yml
| #[error("Failed with SQS error - {0}")] | ||
| Sqs(#[from] aws_sdk_sqs::Error), | ||
| #[error("{0}")] | ||
| Sqs(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change this?
Co-authored-by: Vigith Maurice <[email protected]>
Summary
Enables propagation of SQS attributes through Numaflow pipelines, supporting FIFO queue semantics (
MessageGroupId,MessageDeduplicationId) and user-defined message attributes.Changes
SQS Source: Attribute extraction
Renamed
SqsMessagefields tosystem_attributes(SQS system attrs likeSentTimestamp,MessageGroupId) andcustom_attributes(user-defined message attributes). System attributes become message headers; custom attributes are stored inmetadata.user_metadata["sqs"].SQS Sink: FIFO queue support
Added
headersfield toSqsSinkMessage. The sink readsMessageGroupId,MessageDeduplicationId, andDelaySecondsfrom headers and applies them to the SQSSendMessageBatchrequest.numaflow-core integration
Source maps
system_attributes→Message.headersandcustom_attributes→Message.metadata. Sink merges headers withmetadata.user_metadata["sqs"], allowing UDFs/transformers to set or override FIFO attributes.Improved error messages
Changed
Error::Sqsto store extracted error details instead of opaque AWS SDK error. Errors now display as"ErrorCode: Error message"instead of"unhandled error (CODE)".Documentation
Added sink documentation (
docs/user-guide/sinks/sqs.md), updated source docs with attribute configuration examples, and added SQS example to message headers concept page.