You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When provided with an event and a Pydantic model, the envelope's parse() method should correctly parse the data, including decompression of it (as as far as I am aware, it is compressed by the Kinesis itself).
Current Behaviour
My Lambda function receives CloudWatch Logs via a Kinesis Data Stream. When using event parser with the KinesisDataStreamEnvelope either as a annotation above lambda_handler or explicitly calling envelope's parse method, I keep getting a UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte.
After debugging it appears as if there is a decompression step missing in the envelope's parse method between casting to bytes and decoding using 'utf-8'.
Code snippet
# Pydantic models for CloudWatch Logs representationclassCloudWatchLogEvent(BaseModel):
model_config=ConfigDict(alias_generator=to_camel)
id: strtimestamp: str|intmessage: strclassCloudWatchLog(BaseModel):
model_config=ConfigDict(alias_generator=to_camel)
message_type: strowner: str=Field(pattern=r"^\d{12}$") # 12 digits patternlog_group: strlog_stream: strsubscription_filter: list[str]
log_events: list[CloudWatchLogEvent] =Field(min_length=1)
# Version 1 - not working@event_parser(model=CloudWatchLog, envelope=envelopes.KinesisDataStreamEnvelope)deflambda_handler(event: list[CloudWatchLog], context: LambdaContext) ->dict:
...
# Version 2 - also not workingdeflambda_handler(event: dict, context: LambdaContext) ->dict:
log: list[MyLogModel] =envelopes.KinesisDataStreamEnvelope().parse(
event, CloudWatchLog
)
...
Possible Solution
When manually parsing Kinesis Data Stream input in a Lambda function and locally (using a saved event), the following code works:
To fix the envelope itself, importing gzip and adding a line: data = gzip.decompress(data)
in utilities/parser/envelopes/kinesis.py file between casting to bytes and decoding using 'utf-8' fixes the issue.
Since this could be an edge case, it should also be fine to wrap the models.append(self._parse(data=data.decode('utf-8')... line in a try/except clause catching the UnicodeDecodeError exception and then performing the decompression before trying again to decode using 'utf-8'.
Steps to Reproduce
Have a Kinesis Data Stream moving CloudWatch Logs to a Lambda function (or have a saved event and use it locally)
Use the CloudWatchLog Pydantic model provided in the code snippet
Use the KinesisDataStream envelope with an event parser either as an annotation above the Lambda handler, or explicitly call the parse method
Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link
Expected Behaviour
When provided with an event and a Pydantic model, the envelope's
parse()
method should correctly parse the data, including decompression of it (as as far as I am aware, it is compressed by the Kinesis itself).Current Behaviour
My Lambda function receives CloudWatch Logs via a Kinesis Data Stream. When using event parser with the
KinesisDataStreamEnvelope
either as a annotation abovelambda_handler
or explicitly calling envelope'sparse
method, I keep getting aUnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
.After debugging it appears as if there is a decompression step missing in the envelope's
parse
method between casting to bytes and decoding using 'utf-8'.Code snippet
Possible Solution
When manually parsing Kinesis Data Stream input in a Lambda function and locally (using a saved event), the following code works:
Fixing the envelope
To fix the envelope itself, importing
gzip
and adding a line:data = gzip.decompress(data)
in utilities/parser/envelopes/kinesis.py file between casting to bytes and decoding using 'utf-8' fixes the issue.
Since this could be an edge case, it should also be fine to wrap the
models.append(self._parse(data=data.decode('utf-8')...
line in atry
/except
clause catching theUnicodeDecodeError
exception and then performing the decompression before trying again to decode using 'utf-8'.Steps to Reproduce
parse
methodThank you
Powertools for AWS Lambda (Python) version
3.9.0
AWS Lambda function runtime
3.12
Packaging format used
PyPi
Debugging logs
The text was updated successfully, but these errors were encountered: