Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions src/cloudevents/core/bindings/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

from cloudevents.core.base import BaseCloudEvent, EventFactory
from cloudevents.core.formats.base import Format
from cloudevents.core.formats.json import JSONFormat
from cloudevents.core.v1.event import CloudEvent

# AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes
# The underscore variant is preferred for JMS 2.0 compatibility
Expand Down Expand Up @@ -324,3 +326,116 @@ def from_amqp(
return from_structured(message, event_format, event_factory)

return from_binary(message, event_format, event_factory)


def to_binary_event(
event: BaseCloudEvent,
event_format: Format | None = None,
) -> AMQPMessage:
"""
Convenience wrapper for to_binary with JSON format as default.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.bindings import amqp
>>>
>>> event = CloudEvent(
... attributes={"type": "com.example.test", "source": "/test"},
... data={"message": "Hello"}
... )
>>> message = amqp.to_binary_event(event)

:param event: The CloudEvent to convert
:param event_format: Format implementation (defaults to JSONFormat)
:return: AMQPMessage with CloudEvent attributes as application properties
"""
if event_format is None:
event_format = JSONFormat()
return to_binary(event, event_format)


def from_binary_event(
message: AMQPMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_binary with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.bindings import amqp
>>> event = amqp.from_binary_event(message)

:param message: AMQPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_binary(message, event_format, CloudEvent)


def to_structured_event(
event: BaseCloudEvent,
event_format: Format | None = None,
) -> AMQPMessage:
"""
Convenience wrapper for to_structured with JSON format as default.

Example:
>>> from cloudevents.core.v1.event import CloudEvent
>>> from cloudevents.core.bindings import amqp
>>>
>>> event = CloudEvent(
... attributes={"type": "com.example.test", "source": "/test"},
... data={"message": "Hello"}
... )
>>> message = amqp.to_structured_event(event)

:param event: The CloudEvent to convert
:param event_format: Format implementation (defaults to JSONFormat)
:return: AMQPMessage with structured content in application-data
"""
if event_format is None:
event_format = JSONFormat()
return to_structured(event, event_format)


def from_structured_event(
message: AMQPMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_structured with JSON format and CloudEvent as defaults.

Example:
>>> from cloudevents.core.bindings import amqp
>>> event = amqp.from_structured_event(message)

:param message: AMQPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_structured(message, event_format, CloudEvent)


def from_amqp_event(
message: AMQPMessage,
event_format: Format | None = None,
) -> CloudEvent:
"""
Convenience wrapper for from_amqp with JSON format and CloudEvent as defaults.
Auto-detects binary or structured mode.

Example:
>>> from cloudevents.core.bindings import amqp
>>> event = amqp.from_amqp_event(message)

:param message: AMQPMessage to parse
:param event_format: Format implementation (defaults to JSONFormat)
:return: CloudEvent instance
"""
if event_format is None:
event_format = JSONFormat()
return from_amqp(message, event_format, CloudEvent)