Skip to content

Bug: KinesisDataStreamEnvelope().parse() method seems to be missing data decompression step #6625

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
Artur-T-Malas opened this issue May 8, 2025 · 1 comment
Labels
bug Something isn't working triage Pending triage from maintainers

Comments

@Artur-T-Malas
Copy link

Artur-T-Malas commented May 8, 2025

Expected Behaviour

When provided with an event and a Pydantic model, the envelope's parse() method should correctly parse the data, including decompression of it (as as far as I am aware, it is compressed by the Kinesis itself).

Current Behaviour

My Lambda function receives CloudWatch Logs via a Kinesis Data Stream. When using event parser with the KinesisDataStreamEnvelope either as a annotation above lambda_handler or explicitly calling envelope's parse method, I keep getting a UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte.

After debugging it appears as if there is a decompression step missing in the envelope's parse method between casting to bytes and decoding using 'utf-8'.

Code snippet

# Pydantic models for CloudWatch Logs representation
class CloudWatchLogEvent(BaseModel):
  model_config = ConfigDict(alias_generator=to_camel)

 id: str
 timestamp: str | int
 message: str


class CloudWatchLog(BaseModel):
  model_config = ConfigDict(alias_generator=to_camel)

  message_type: str
  owner: str = Field(pattern=r"^\d{12}$")  # 12 digits pattern
  log_group: str
  log_stream: str
  subscription_filter: list[str]
  log_events: list[CloudWatchLogEvent] = Field(min_length=1)


# Version 1 - not working
@event_parser(model=CloudWatchLog, envelope=envelopes.KinesisDataStreamEnvelope)
def lambda_handler(event: list[CloudWatchLog], context: LambdaContext) -> dict:
  ...


# Version 2 - also not working
def lambda_handler(event: dict, context: LambdaContext) -> dict:
log: list[MyLogModel] = envelopes.KinesisDataStreamEnvelope().parse(
  event, CloudWatchLog
)
...

Possible Solution

When manually parsing Kinesis Data Stream input in a Lambda function and locally (using a saved event), the following code works:

record_data: bytes = base64.b64decode(record['kinesis']['data'])
decompressed_data: bytes = gzip.decompress(record_data)
decoded_data: str = record_data_decode('utf-8')

Fixing the envelope

To fix the envelope itself, importing gzip and adding a line:
data = gzip.decompress(data)
in utilities/parser/envelopes/kinesis.py file between casting to bytes and decoding using 'utf-8' fixes the issue.

Since this could be an edge case, it should also be fine to wrap the models.append(self._parse(data=data.decode('utf-8')... line in a try/except clause catching the UnicodeDecodeError exception and then performing the decompression before trying again to decode using 'utf-8'.

Steps to Reproduce

  1. Have a Kinesis Data Stream moving CloudWatch Logs to a Lambda function (or have a saved event and use it locally)
  2. Use the CloudWatchLog Pydantic model provided in the code snippet
  3. Use the KinesisDataStream envelope with an event parser either as an annotation above the Lambda handler, or explicitly call the parse method

Thank you

Powertools for AWS Lambda (Python) version

3.9.0

AWS Lambda function runtime

3.12

Packaging format used

PyPi

Debugging logs

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x8b in position 1: invalid start byte
@Artur-T-Malas Artur-T-Malas added bug Something isn't working triage Pending triage from maintainers labels May 8, 2025
Copy link

boring-cyborg bot commented May 8, 2025

Thanks for opening your first issue here! We'll come back to you as soon as we can.
In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Pending triage from maintainers
Projects
Development

No branches or pull requests

1 participant