Skip to content

Commit d80e245

Browse files
thejonespre-commit-ci[bot]auvipy
authored andcommitted
Fix #2286 : SQS - Enhance support for receiving message attributes. Allow string in msg body. (#2300)
* Enhance support for receiving message attributes. Allow string in body of sqs message * fix lint errors * Add name to Authors * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * pr comment updates, refactor, and add more test coverage * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update kombu/asynchronous/http/urllib3_client.py --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Asif Saif Uddin <[email protected]>
1 parent cc20585 commit d80e245

File tree

6 files changed

+622
-108
lines changed

6 files changed

+622
-108
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,5 @@ env
3939
.coverage.*
4040
control/
4141
.env
42+
.hypothesis/*
43+
junit.xml

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Adam Gaca <[email protected]>
77
Adam Nelson <[email protected]>
88
Adam Wentz
99
Alan Justino <[email protected]>
10+
Alex Jones
1011
Alex Koshelev <[email protected]>
1112
Alexandre Bourget <[email protected]>
1213
Anastasis Andronidis <[email protected]>

kombu/asynchronous/aws/sqs/connection.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,28 @@
2020
class AsyncSQSConnection(AsyncAWSQueryConnection):
2121
"""Async SQS Connection."""
2222

23-
def __init__(self, sqs_connection, debug=0, region=None, fetch_message_attributes=None, **kwargs):
23+
def __init__(
24+
self,
25+
sqs_connection,
26+
debug=0,
27+
region=None,
28+
message_system_attribute_names=None,
29+
message_attribute_names=None,
30+
**kwargs
31+
):
2432
if boto3 is None:
2533
raise ImportError('boto3 is not installed')
2634
super().__init__(
2735
sqs_connection,
2836
region_name=region, debug=debug,
2937
**kwargs
3038
)
31-
self.fetch_message_attributes = (
32-
fetch_message_attributes if fetch_message_attributes is not None
33-
else ["ApproximateReceiveCount"]
39+
self.message_system_attribute_names = (
40+
message_system_attribute_names if message_system_attribute_names else ["ApproximateReceiveCount"]
41+
)
42+
self.message_attribute_names = (
43+
[message_attribute_names] if isinstance(message_attribute_names, str)
44+
else (message_attribute_names or [])
3445
)
3546

3647
def _create_query_request(self, operation, params, queue_url, method):
@@ -160,13 +171,17 @@ def receive_message(
160171
):
161172
params = {'MaxNumberOfMessages': number_messages}
162173
proto_params = {'query': {}, 'json': {}}
163-
attrs = attributes if attributes is not None else self.fetch_message_attributes
174+
attrs = attributes if attributes is not None else self.message_system_attribute_names
175+
msg_attr_names = self.message_attribute_names if self.message_attribute_names else None
164176

165177
if visibility_timeout:
166178
params['VisibilityTimeout'] = visibility_timeout
167179
if attrs:
168-
proto_params['json'].update({'AttributeNames': list(attrs)})
169-
proto_params['query'].update(_query_object_encode({'AttributeName': list(attrs)}))
180+
proto_params['json'].update({'MessageSystemAttributeNames': list(attrs)})
181+
proto_params['query'].update(_query_object_encode({'MessageSystemAttributeName': list(attrs)}))
182+
if msg_attr_names:
183+
proto_params['json'].update({'MessageAttributeNames': list(msg_attr_names)})
184+
proto_params['query'].update(_query_object_encode({'MessageAttributeNames': list(msg_attr_names)}))
170185
if wait_time_seconds is not None:
171186
params['WaitTimeSeconds'] = wait_time_seconds
172187

0 commit comments

Comments
 (0)