From 7681ec8b5fa40036ef047caad064d33dcdef9e95 Mon Sep 17 00:00:00 2001 From: Tudor Plugaru Date: Tue, 30 Dec 2025 13:57:11 +0200 Subject: [PATCH] chore: simplified interface to read/write CE using AMQP bindings. Signed-off-by: Tudor Plugaru --- src/cloudevents/core/bindings/amqp.py | 115 ++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/src/cloudevents/core/bindings/amqp.py b/src/cloudevents/core/bindings/amqp.py index bafd8f4..30032bc 100644 --- a/src/cloudevents/core/bindings/amqp.py +++ b/src/cloudevents/core/bindings/amqp.py @@ -20,6 +20,8 @@ from cloudevents.core.base import BaseCloudEvent, EventFactory from cloudevents.core.formats.base import Format +from cloudevents.core.formats.json import JSONFormat +from cloudevents.core.v1.event import CloudEvent # AMQP CloudEvents spec allows both cloudEvents_ and cloudEvents: prefixes # The underscore variant is preferred for JMS 2.0 compatibility @@ -324,3 +326,116 @@ def from_amqp( return from_structured(message, event_format, event_factory) return from_binary(message, event_format, event_factory) + + +def to_binary_event( + event: BaseCloudEvent, + event_format: Format | None = None, +) -> AMQPMessage: + """ + Convenience wrapper for to_binary with JSON format as default. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.bindings import amqp + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = amqp.to_binary_event(event) + + :param event: The CloudEvent to convert + :param event_format: Format implementation (defaults to JSONFormat) + :return: AMQPMessage with CloudEvent attributes as application properties + """ + if event_format is None: + event_format = JSONFormat() + return to_binary(event, event_format) + + +def from_binary_event( + message: AMQPMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_binary with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.bindings import amqp + >>> event = amqp.from_binary_event(message) + + :param message: AMQPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_binary(message, event_format, CloudEvent) + + +def to_structured_event( + event: BaseCloudEvent, + event_format: Format | None = None, +) -> AMQPMessage: + """ + Convenience wrapper for to_structured with JSON format as default. + + Example: + >>> from cloudevents.core.v1.event import CloudEvent + >>> from cloudevents.core.bindings import amqp + >>> + >>> event = CloudEvent( + ... attributes={"type": "com.example.test", "source": "/test"}, + ... data={"message": "Hello"} + ... ) + >>> message = amqp.to_structured_event(event) + + :param event: The CloudEvent to convert + :param event_format: Format implementation (defaults to JSONFormat) + :return: AMQPMessage with structured content in application-data + """ + if event_format is None: + event_format = JSONFormat() + return to_structured(event, event_format) + + +def from_structured_event( + message: AMQPMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_structured with JSON format and CloudEvent as defaults. + + Example: + >>> from cloudevents.core.bindings import amqp + >>> event = amqp.from_structured_event(message) + + :param message: AMQPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_structured(message, event_format, CloudEvent) + + +def from_amqp_event( + message: AMQPMessage, + event_format: Format | None = None, +) -> CloudEvent: + """ + Convenience wrapper for from_amqp with JSON format and CloudEvent as defaults. + Auto-detects binary or structured mode. + + Example: + >>> from cloudevents.core.bindings import amqp + >>> event = amqp.from_amqp_event(message) + + :param message: AMQPMessage to parse + :param event_format: Format implementation (defaults to JSONFormat) + :return: CloudEvent instance + """ + if event_format is None: + event_format = JSONFormat() + return from_amqp(message, event_format, CloudEvent)