From d15cecdf257f27e1f77b88a1a4a2ee6a9b5b275f Mon Sep 17 00:00:00 2001 From: "Stephen C. Pope" Date: Fri, 18 Oct 2024 18:50:21 -0600 Subject: [PATCH] [Core-547] Client: Catalog support for new event rules and new event types (#12708) GitOrigin-RevId: a601c1bee027f55073963429237b36d4312b3d11 --- descarteslabs/core/catalog/__init__.py | 10 + descarteslabs/core/catalog/blob.py | 6 +- .../core/catalog/event_api_destination.py | 5 +- descarteslabs/core/catalog/event_rule.py | 12 +- descarteslabs/core/catalog/event_schedule.py | 5 +- .../core/catalog/event_subscription.py | 279 ++++++++++++++++-- .../catalog/tests/test_event_subscription.py | 74 +++++ 7 files changed, 370 insertions(+), 21 deletions(-) diff --git a/descarteslabs/core/catalog/__init__.py b/descarteslabs/core/catalog/__init__.py index 92715d57..42184f8b 100644 --- a/descarteslabs/core/catalog/__init__.py +++ b/descarteslabs/core/catalog/__init__.py @@ -55,12 +55,17 @@ from .event_rule import EventRule, EventRuleCollection, EventRuleSearch, EventRuleTarget from .event_schedule import EventSchedule, EventScheduleCollection, EventScheduleSearch from .event_subscription import ( + ComputeFunctionCompletedEventSubscription, EventSubscription, EventSubscriptionCollection, EventSubscriptionComputeTarget, EventSubscriptionSearch, + EventSubscriptionSqsTarget, EventSubscriptionTarget, EventType, + NewImageEventSubscription, + NewStorageEventSubscription, + NewVectorEventSubscription, Placeholder, ScheduledEventSubscription, ) @@ -119,6 +124,7 @@ "CatalogObject", "ClassBand", "Colormap", + "ComputeFunctionCompletedEventSubscription", "DataType", "DeletedObjectError", "DeletionTaskStatus", @@ -140,6 +146,7 @@ "EventSubscriptionCollection", "EventSubscriptionComputeTarget", "EventSubscriptionSearch", + "EventSubscriptionSqsTarget", "EventSubscriptionTarget", "EventType", "File", @@ -160,6 +167,9 @@ "MaskBand", "MicrowaveBand", "NamedCatalogObject", + "NewImageEventSubscription", + "NewStorageEventSubscription", + "NewVectorEventSubscription", "OverviewResampler", "Placeholder", "ProcessingLevelsAttribute", diff --git a/descarteslabs/core/catalog/blob.py b/descarteslabs/core/catalog/blob.py index c42041ed..8094e144 100644 --- a/descarteslabs/core/catalog/blob.py +++ b/descarteslabs/core/catalog/blob.py @@ -462,7 +462,11 @@ def get_or_create( if (not id and not name) or (id and name): raise TypeError("Must specify exactly one of id or name parameters") if not id: - id = f"{storage_type}/{Blob.namespace_id(namespace)}/{name}" + namespace = cls.namespace_id(namespace) + id = f"{storage_type}/{namespace}/{name}" + kwargs["storage_type"] = storage_type + kwargs["namespace"] = namespace + kwargs["name"] = name return super(cls, Blob).get_or_create(id, client=client, **kwargs) diff --git a/descarteslabs/core/catalog/event_api_destination.py b/descarteslabs/core/catalog/event_api_destination.py index 3663ac7f..dc7ffb27 100644 --- a/descarteslabs/core/catalog/event_api_destination.py +++ b/descarteslabs/core/catalog/event_api_destination.py @@ -455,7 +455,10 @@ def get_or_create( if (not id and not name) or (id and name): raise TypeError("Must specify exactly one of id or name parameters") if not id: - id = f"{cls.namespace_id(namespace)}:{name}" + namespace = cls.namespace_id(namespace) + id = f"{namespace}:{name}" + kwargs["namespace"] = namespace + kwargs["name"] = name return super(cls, EventApiDestination).get_or_create( id, client=client, **kwargs diff --git a/descarteslabs/core/catalog/event_rule.py b/descarteslabs/core/catalog/event_rule.py index cc484bfc..8de6e0cd 100644 --- a/descarteslabs/core/catalog/event_rule.py +++ b/descarteslabs/core/catalog/event_rule.py @@ -256,6 +256,13 @@ class EventRule(CatalogObject): str, doc="""str: The ARN of the event bus to which this rule belongs.""", ) + role_arn = TypedAttribute( + str, + doc="""str, optional: The ARN of the role to assume when targeting another Event Bus. + + *Filterable, sortable*. + """, + ) rule_arn = TypedAttribute( str, doc="""str: The ARN of the rule.""", @@ -444,7 +451,10 @@ def get_or_create( if (not id and not name) or (id and name): raise TypeError("Must specify exactly one of id or name parameters") if not id: - id = f"{cls.namespace_id(namespace)}:{name}" + namespace = cls.namespace_id(namespace) + id = f"{namespace}:{name}" + kwargs["namespace"] = namespace + kwargs["name"] = name return super(cls, EventRule).get_or_create(id, client=client, **kwargs) diff --git a/descarteslabs/core/catalog/event_schedule.py b/descarteslabs/core/catalog/event_schedule.py index cbcbea0f..53323bd7 100644 --- a/descarteslabs/core/catalog/event_schedule.py +++ b/descarteslabs/core/catalog/event_schedule.py @@ -372,7 +372,10 @@ def get_or_create( if (not id and not name) or (id and name): raise TypeError("Must specify exactly one of id or name parameters") if not id: - id = f"{cls.namespace_id(namespace)}:{name}" + namespace = cls.namespace_id(namespace) + id = f"{namespace}:{name}" + kwargs["namespace"] = namespace + kwargs["name"] = name return super(cls, EventSchedule).get_or_create(id, client=client, **kwargs) diff --git a/descarteslabs/core/catalog/event_subscription.py b/descarteslabs/core/catalog/event_subscription.py index fd132b0e..2a02d376 100644 --- a/descarteslabs/core/catalog/event_subscription.py +++ b/descarteslabs/core/catalog/event_subscription.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections.abc import Mapping import json import functools from typing import Dict, List @@ -58,6 +59,8 @@ class EventType(StrEnum): An existing vector feature in the Vector service has been updated. SCHEDULED : enum A scheduled event. + COMPUTE_FUNCTION_COMPLETED : enum + A compute function has completed all jobs. """ NEW_IMAGE = "new-image" @@ -67,6 +70,7 @@ class EventType(StrEnum): NEW_VECTOR = "new-vector" UPDATE_VECTOR = "update-vector" SCHEDULED = "scheduled" + COMPUTE_FUNCTION_COMPLETED = "compute-function-completed" class EventSubscriptionTarget(MappingAttribute): @@ -242,6 +246,95 @@ def _make_detail_template( ) +class EventSubscriptionSqsTarget(EventSubscriptionTarget): + """An EventSubscriptionTarget tailored for an SQS queue. + + Supports the use of placeholders in the detail template to be substituted + from the matching event and subscription. + """ + + def __init__(self, _: str, *args, **kwargs): + """Create an EventSubscriptionTarget tailored for an SQS queue. + + Placeholder values can be used for any parameter value, which allows + for passing through Jinja2 template substitutions into the resulting + detail template which are otherwise not themselves JSON serializable. + + If no positional or keyword arguments are provided, then the message + defaults to being the event detail. + + Parameters + ---------- + _ : str + The SQS queue URL. + args : Placeholder or mapping type, optional + At most one positional argument which is either a Placeholder object + which will be rendered as a JSON object, or a mapping type which will + yield the same. If a value is provided, then no kwargs are permitted. + kwargs : Any, optional + Keyword parameters to pass in the message to the SQS queue. They + may include placeholders for Jinja2 templating. + """ + super().__init__() + self.rule_id = "descarteslabs:sqs-forwarder" + if len(args) > 1: + raise TypeError("At most one positional argument is allowed") + if args: + if kwargs: + raise TypeError( + "No keyword arguments allowed with a positional argument" + ) + if isinstance(args[0], Placeholder): + message = args[0] + elif isinstance(args[0], Mapping): + message = {**args[0]} + else: + raise ValueError( + "Positional argument must be a Placeholder or a mapping type" + ) + elif kwargs: + message = kwargs + else: + message = Placeholder("event.detail", unquoted=True) + + self.detail_template = self._make_detail_template(_, message) + + def _make_detail_template( + self, + _, + message, + ): + """Generate a template of an SQS queue message for use with a Catalog + EventSubscription to send events to the SQS queue. + + This call will return a JSON template string (with placeholders for + Jinja2 templating) that can be used to send a message to the SQS queue + via an EventSubscription. + + Returns + ------- + str + The the detail template to use for the EventSubscription target. + + Parameters + ---------- + _ : str + The SQS queue URL. + kwargs : Any, optional + Keyword parameters to compose into the message. + """ + placeholders = [] + return Placeholder.substitute_placeholders( + json.dumps( + {"message": message, "sqs_queue_url": _}, + default=functools.partial( + Placeholder.json_serialize, placeholders=placeholders + ), + ), + placeholders, + ) + + class EventSubscriptionSearch(GeoSearch): """A search request that iterates over its search results for event subscriptions. @@ -262,36 +355,36 @@ class EventSubscription(CatalogObject): The :py:meth:`~descarteslabs.catalog.CatalogClient.get_default_client` will be used if not set. kwargs : dict - With the exception of readonly attributes (`created`, `modified`, and `owner`) - and with the exception of properties (`ATTRIBUTES`, `is_modified`, and `state`), - any attribute listed below can also be used as a keyword argument. Also see - `~EventSubscription.ATTRIBUTES`. + With the exception of readonly attributes (`created`, `modified`, `owner`, and + `owner_role_arn`), and with the exception of properties (`ATTRIBUTES`, + `is_modified`, and `state`), any attribute listed below can also be used as a + keyword argument. Also see `~EventSubscription.ATTRIBUTES`. .. _event_subscription_note: Note ---- - The ``reader`` and ``writer`` IDs must be prefixed with ``email:``, ``user:``, - ``group:`` or ``org:``. The ``owner`` ID only accepts ``org:`` and ``user:``. - Using ``org:`` as an ``owner`` will assign those privileges only to administrators - for that organization; using ``org:`` as a ``reader`` or ``writer`` assigns those + The ``readers`` and ``writers`` IDs must be prefixed with ``email:``, ``user:``, + ``group:`` or ``org:``. The ``owners`` ID only accepts ``org:`` and ``user:``. + Using ``org:`` as an owner will assign those privileges only to administrators + for that organization; using ``org:`` as a reader or writer assigns those privileges to everyone in that organization. The `readers` and `writers` attributes - are only visible in full to the `owners`. If you are a `reader` or a `writer` those + are only visible in full to the `owners`. If you are a reader or a writer those attributes will only display the element of those lists by which you are gaining read or write access. - Any user with ``owner`` privileges is able to read the event subscription attributes or data, + Any user with owner privileges is able to read the event subscription attributes or data, modify the event subscription attributes, or delete the event subscription, including reading and modifying the ``owners``, ``writers``, and ``readers`` attributes. - Any user with ``writer`` privileges is able to read the event subscription attributes or data, + Any user with writer privileges is able to read the event subscription attributes or data, or modify the event subscription attributes, but not delete the event subscription. A ``writer`` can read the ``owners`` and can only read the entry in the ``writers`` and/or ``readers`` by which they gain access to the event subscription. - Any user with ``reader`` privileges is able to read the event subscription attributes or data. - A ``reader`` can read the ``owners`` and can only read the entry in the ``writers`` and/or + Any user with reader privileges is able to read the event subscription attributes or data. + A reader can read the ``owners`` and can only read the entry in the ``writers`` and/or ``readers`` by which they gain access to the event subscription. Also see :doc:`Sharing Resources `. @@ -363,6 +456,15 @@ class EventSubscription(CatalogObject): *Filterable, sortable*. """, ) + owner_role_arn = TypedAttribute( + str, + doc="""str, readonlyl: The AWS IAM role associated with the owner for use in target invocation. + + This attribute may not be set by the end user. + + *Filterable, sortable*. + """, + ) event_type = ListAttribute( EnumAttribute(EventType), doc="""list(str): Event detail types which this subscription will match. At least one event @@ -590,7 +692,10 @@ def get_or_create( if (not id and not name) or (id and name): raise TypeError("Must specify exactly one of id or name parameters") if not id: - id = f"{cls.namespace_id(namespace)}:{name}" + namespace = cls.namespace_id(namespace) + id = f"{namespace}:{name}" + kwargs["namespace"] = namespace + kwargs["name"] = name return super(cls, EventSubscription).get_or_create(id, client=client, **kwargs) @@ -633,6 +738,146 @@ class EventSubscriptionCollection(Collection): EventSubscription._collection_type = EventSubscriptionCollection +class NewImageEventSubscription(EventSubscription): + """A convenience class for creating an EventSubscription for a new image event. + + Creates an EventSubscription for a new image event. Based on the one or more + Product ids provided to the constructer, the subscription is configured + with the correct ``event_source``, ``event_type``, and ``event_namespace`` + attributes, so that they need not be provided explicitly (indeed if they are + explicitly provided, they will be overwritten). + """ + + _derived_type = "new_image_event_subscription" + + def __init__(self, *product_ids, **kwargs): + """Create an EventSubscription for a new image event. + + Parameters + ---------- + product_ids : str (one or more positional arguments) + The ids of one or more products to be subscribed, as separate positional arguments. + Plus any additional keyword arguments to pass to the EventSubscription constructor. + """ + if not product_ids: + raise TypeError( + "At least one Product id must be provided as a positional argument" + ) + if any(not isinstance(id, str) for id in product_ids): + raise TypeError("All Product ids must be strings") + + kwargs["event_source"] = ["catalog"] + kwargs["event_type"] = [EventType.NEW_IMAGE] + kwargs["event_namespace"] = product_ids + super().__init__(**kwargs) + + +class NewStorageEventSubscription(EventSubscription): + """A convenience class for creating an EventSubscription for a new storage event. + + Creates an EventSubscription for a new storage event. Based on the one or more + Blob namespaces provided to the constructer, the subscription is configured + with the correct ``event_source``, ``event_type``, and ``event_namespace`` + attributes, so that they need not be provided explicitly (indeed if they are + explicitly provided, they will be overwritten). + """ + + _derived_type = "new_storage_event_subscription" + + def __init__(self, *namespaces, **kwargs): + """Create an EventSubscription for a new storage event. + + Parameters + ---------- + namespaces : str (one or more positional arguments) + One or more storage namespaces to be subscribed, as separate positional arguments. + Plus any additional keyword arguments to pass to the EventSubscription constructor. + """ + if not namespaces: + raise TypeError( + "At least one storage namespace must be provided as a positional argument" + ) + if any(not isinstance(id, str) for id in namespaces): + raise TypeError("All Product ids must be strings") + + kwargs["event_source"] = ["catalog"] + kwargs["event_type"] = [EventType.NEW_STORAGE] + kwargs["event_namespace"] = namespaces + super().__init__(**kwargs) + + +class NewVectorEventSubscription(EventSubscription): + """A convenience class for creating an EventSubscription for a new storage event. + + Creates an EventSubscription for a new vector event. Based on the one or more + Vector product ids provided to the constructer, the subscription is configured + with the correct ``event_source``, ``event_type``, and ``event_namespace`` + attributes, so that they need not be provided explicitly (indeed if they are + explicitly provided, they will be overwritten). + """ + + _derived_type = "new_vector_event_subscription" + + def __init__(self, *product_ids, **kwargs): + """Create an EventSubscription for a new vector event. + + Parameters + ---------- + product_ids : str (one or more positional arguments) + The ids of one or more vector products to be subscribed, as separate positional arguments. + Plus any additional keyword arguments to pass to the EventSubscription constructor. + """ + if not product_ids: + raise TypeError( + "At least one product id must be provided as a positional argument" + ) + if any(not isinstance(id, str) for id in product_ids): + raise TypeError("All product ids must be strings") + + kwargs["event_source"] = ["vector"] + kwargs["event_type"] = [EventType.NEW_VECTOR] + kwargs["event_namespace"] = product_ids + super().__init__(**kwargs) + + +class ComputeFunctionCompletedEventSubscription(EventSubscription): + """A convenience class for creating an EventSubscription for a compute + function completion event. + + Creates an EventSubscription for a compute function completion event. + Based on the one or more Function ids provided to the constructer, + the subscription is configured with the correct ``event_source``, + ``event_type``, and ``event_namespace`` attributes, so that they + need not be provided explicitly (indeed if they are explicitly provided, + they will be overwritten). + """ + + _derived_type = "compute_function_completed_event_subscription" + + def __init__(self, *function_ids, **kwargs): + """Create an EventSubscription for a compute function completion event. + + Parameters + ---------- + function_ids : str (one or more positional arguments) + One or more Function ids or Function namespaces to be subscribed, + as separate positional arguments. A Function namespace will match + all functions in that namespace. + Plus any additional keyword arguments to pass to the EventSubscription constructor. + """ + if not function_ids: + raise TypeError( + "At least one function id or namespace must be provided as a positional argument" + ) + if any(not isinstance(id, str) for id in function_ids): + raise TypeError("All product ids must be strings") + + kwargs["event_source"] = ["compute"] + kwargs["event_type"] = [EventType.COMPUTE_FUNCTION_COMPLETED] + kwargs["event_namespace"] = function_ids + super().__init__(**kwargs) + + class ScheduledEventSubscription(EventSubscription): """A convenience class for creating an EventSubscription for a scheduled event. @@ -650,8 +895,8 @@ def __init__(self, *event_schedule_ids, **kwargs): Parameters ---------- - event_schedule_ids : str - The ids of one or more scheduled event to subscribe to (as separate positional arguments). + event_schedule_ids : str (one or more positional arguments) + The ids of one or more scheduled event to be subscribed, as separate positional arguments. Plus any additional keyword arguments to pass to the EventSubscription constructor. """ if not event_schedule_ids: @@ -662,6 +907,6 @@ def __init__(self, *event_schedule_ids, **kwargs): raise TypeError("All EventSchedule ids must be strings") kwargs["event_source"] = ["scheduler"] - kwargs["event_type"] = ["scheduled"] + kwargs["event_type"] = [EventType.SCHEDULED] kwargs["event_namespace"] = event_schedule_ids super().__init__(**kwargs) diff --git a/descarteslabs/core/catalog/tests/test_event_subscription.py b/descarteslabs/core/catalog/tests/test_event_subscription.py index d2fa8767..9e305c46 100644 --- a/descarteslabs/core/catalog/tests/test_event_subscription.py +++ b/descarteslabs/core/catalog/tests/test_event_subscription.py @@ -27,12 +27,17 @@ from .base import ClientTestCase from ..attributes import AttributeValidationError from ..event_subscription import ( + ComputeFunctionCompletedEventSubscription, EventSubscription, EventSubscriptionCollection, EventSubscriptionComputeTarget, EventSubscriptionSearch, + EventSubscriptionSqsTarget, EventSubscriptionTarget, EventType, + NewImageEventSubscription, + NewStorageEventSubscription, + NewVectorEventSubscription, Placeholder, ScheduledEventSubscription, ) @@ -172,6 +177,7 @@ def test_get(self): "name": "test-sub", "namespace": "descarteslabs:test-namespace", "owner": "user:somehash", + "owner_role_arn": "arn:aws:iam::123456789012:role/metadata-event-invoke-somehash", "owners": ["org:descarteslabs"], "readers": ["org:descarteslabs"], "tags": ["TESTING"], @@ -200,6 +206,10 @@ def test_get(self): assert s.namespace == "descarteslabs:test-namespace" assert s.description == "a generic description" assert s.owner == "user:somehash" + assert ( + s.owner_role_arn + == "arn:aws:iam::123456789012:role/metadata-event-invoke-somehash" + ) assert s.expires is None assert s.geometry == shapely.geometry.shape(self.geometry) assert s.event_type == [EventType.NEW_IMAGE] @@ -462,6 +472,7 @@ def test_save(self): "name": "test-sub", "namespace": "descarteslabs:test-namespace", "owner": "user:somehash", + "owner_role_arn": "arn:aws:iam::123456789012:role/metadata-event-invoke-somehash", "owners": ["org:descarteslabs"], "readers": ["org:descarteslabs"], "writers": [], @@ -696,6 +707,69 @@ def test_compute_target(self): == '{"body": {"function_id": "some-function-id", "args": ["{{ event.detail.id }}"], "kwargs": {"detail": {{ event.detail }}, "source": "{{ event.source }}"}}}' # noqa: E501 ) + def test_sqs_target_positional(self): + target = EventSubscriptionSqsTarget( + "some-sqs-queue-url", + Placeholder("event.detail", unquoted=True), + ) + assert isinstance(target, EventSubscriptionTarget) + assert target.rule_id == "descarteslabs:sqs-forwarder" + assert ( + target.detail_template + == '{"message": {{ event.detail }}, "sqs_queue_url": "some-sqs-queue-url"}' + ) + + def test_sqs_target_kwargs(self): + target = EventSubscriptionSqsTarget( + "some-sqs-queue-url", + id=Placeholder("event.detail.id"), + ) + assert isinstance(target, EventSubscriptionTarget) + assert target.rule_id == "descarteslabs:sqs-forwarder" + assert ( + target.detail_template + == '{"message": {"id": "{{ event.detail.id }}"}, "sqs_queue_url": "some-sqs-queue-url"}' + ) + + def test_sqs_target_default(self): + target = EventSubscriptionSqsTarget( + "some-sqs-queue-url", + ) + assert isinstance(target, EventSubscriptionTarget) + assert target.rule_id == "descarteslabs:sqs-forwarder" + assert ( + target.detail_template + == '{"message": {{ event.detail }}, "sqs_queue_url": "some-sqs-queue-url"}' + ) + + def test_new_image_event_subscription(self): + sub = NewImageEventSubscription("some-product-id") + assert isinstance(sub, EventSubscription) + assert sub.event_source == ["catalog"] + assert sub.event_type == [EventType.NEW_IMAGE] + assert sub.event_namespace == ["some-product-id"] + + def test_new_storage_event_subscription(self): + sub = NewStorageEventSubscription("some-namespace") + assert isinstance(sub, EventSubscription) + assert sub.event_source == ["catalog"] + assert sub.event_type == [EventType.NEW_STORAGE] + assert sub.event_namespace == ["some-namespace"] + + def test_new_vector_event_subscription(self): + sub = NewVectorEventSubscription("some-product-id") + assert isinstance(sub, EventSubscription) + assert sub.event_source == ["vector"] + assert sub.event_type == [EventType.NEW_VECTOR] + assert sub.event_namespace == ["some-product-id"] + + def test_compute_function_completed_event_subscription(self): + sub = ComputeFunctionCompletedEventSubscription("some-function-id") + assert isinstance(sub, EventSubscription) + assert sub.event_source == ["compute"] + assert sub.event_type == [EventType.COMPUTE_FUNCTION_COMPLETED] + assert sub.event_namespace == ["some-function-id"] + def test_scheduled_event_subscription(self): sub = ScheduledEventSubscription("some-schedule-id") assert isinstance(sub, EventSubscription)