Skip to content

Alternative proposal for Actor PubSub implementation #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

WhitWaldo
Copy link

None of the comments have been addressed in the various reviews on the existing Actors PubSub proposal nor has any discussion occurred on the thread I created in the maintainers channel on Discord.

But as there is voting occurring on the existing proposal, I'm making a last-ditch effort to suggest ditching that proposal in favor of an alternative that I think is more broadly applicable to what people want to get out of Actors and PubSub, repurposes what's already available and used in Dapr and avoids having additional "one component for all occasions" dependencies.

Fundamentally, I take what we already use for Dapr PubSub today (declarative, programmatic and streaming subscriptions) and extend it to Actors. Contrary to the existing proposal which changes how anyone would have to send events in to use them in actors, my proposal allows actors to subscribe to any existing PubSub component without changing how the events come in and only changing how actors themselves subscribe. In other words, mine is a drop-in change that anyone can use on day-one with existing PubSub without needing anything more than annotating actor methods with corresponding routing (for declarative or programmatic subscriptions).

This is a replacement for the PR at #75 since I accidentally created it from the wrong branch

I welcome any and all feedback!

WhitWaldo and others added 4 commits March 22, 2025 14:16
Co-authored-by: Anton Troshin <[email protected]>
Signed-off-by: Whit Waldo <[email protected]>
Co-authored-by: Anton Troshin <[email protected]>
Signed-off-by: Whit Waldo <[email protected]>
Co-authored-by: Anton Troshin <[email protected]>
Signed-off-by: Whit Waldo <[email protected]>
@WhitWaldo
Copy link
Author

WhitWaldo commented Mar 25, 2025

Following the maintainer call on 3/24, @JoshVanL and I talked through several aspects of this proposal and I wanted to write them down for posterity:

Shift from existing PubSub approach - single Dapr consumer instead of per subscriber

Where it makes sense for the PubSub broker to set up a new consumer for each subscription today, this isn't practical with PubSub actors as offhand, I cannot think of a PubSub provider that supports more than 32 consumer groups, much less an infinite amount (e.g. the potential identifier space for actor IDs). Rather, I propose that this be handled differently in that the Dapr runtime itself be the sole consumer of inbound messages and that it itself is responsible for attempting to locate a subscribed message consumer. If a consumer is identified and the message conveyed, the message should be acknowledged back to the PubSub component (and retries to subscribers handled per resiliency policies). If no consumer is found, the messages should be either retried or dropped (per policy).

Subscription Types

We agreed that the most valuable subscription type here is streaming. I expressed support for the notion that like Orleans, the concept of a PubSub subscription should be one where they're potentially either long-standing or brief and favor dynamic creation/deletion. To that end, I'd be entirely OK with dropping declarative subscriptions from the initial release. While it could be nice to have programmatic subscriptions (applied only at service startup and applicable only to previously activated actor instances), if the initial version only allows for streaming subscriptions, I think that'd be a great place to start.

Existing declarative PubSub cannot point to actors for lack of identifying actor names and instance IDs.

Filter Type

Today, we favor Google's CEL approach for handling event filtering, but it's inconsistent with the approach suggested in CloudEvents. While I modified the CloudEvent schema to support this CEL approach, we might want to take advantage of the opportunity here of a few feature to instead wholeheartedly embrace conformance with CloudEvents instead of CEL.

Subscription de-duplication

Because I propose that the filter be assigned on the runtime (to avoid unnecessary actor activation), in the short term this introduces the opportunity to do subscription de-duplication on a per-actor ID basis. Reiterating from the proposal, I think it's very possible that a single actor instance may have more than one subscription to the same PubSub component and topic (and potentially with the same filter across multiple methods) so there's an opportunity to at least put all these in the same record.

That said, I would eventually (as part of a separate proposal) support another Orleans capability of rewinding subscriptions. When a subscription is created, it naturally receives only the next message received, but if the underlying provider supports sequence tokens (e.g. like Azure Event Hub), these can be passed back by Dapr from the subscriber to pull earlier messages prior to subscription. This might have an impact on runtime-side de-duplication, so it's something to consider in advance.

Unmentioned on Call

There are a few additional tweaks I'll add to the PR itself, but since I'm thinking about it:

  • Should have a means of getting an existing subscription details (even if just by ID)
  • Should have a means of listing existing subscriptions for a given actor ID
  • Shift to supporting CloudEvents 1.0.2 standard
    This is sort of covered above in the note about CEL vs CloudEvent filtering, but worth repeating. Today, Dapr assumes that if the content type isn't provided, it must be text/plain and rather assumes that the data is JSON or similar for filtering. I would instead propose that instead of having our own funky hybrid of this standard, we take the opportunity to fully support CloudEvent including support for binary or text payloads on events. and adopt the subscriptions spec for this PubSub actor functionality and adopt their filtering functionality as a universal standard to aspire to going forward.

@theperm
Copy link

theperm commented Mar 26, 2025

I like this approach. Thumbs up for CE subscriptions spec - thats only just come to my attention but make sense for this.

Copy link
Contributor

@cicoyle cicoyle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally support the proposal. Just 2 clarifications


// The message containing the details for subscribing an actor to a topic via streaming or otherwise
message SubscribeActorEventRequestAlpha1 {
oneof subscribe_topic_events_request_type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember seeing a comment somewhere on the types here - was that resolved and this is the final approach?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone through and reworked the entire HTTP and gRPC specification for each of the management operations (create, get, update, delete, query), so this has since been removed in favor of something that made more sense and was specific to my proposal.

subscription so that there is no substantive difference in how one subscription is registered versus another, lessening
the burden on the runtime to juggle different approaches.

#### Streaming Subscriptions API
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to implement this in a phased approach? Like start with streaming subscriptions then add to declarative and programmatic? What is the thought process here for scope?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the result of our chat Monday was that we should look at supporting streaming subscriptions first and only revisit with the remaining options later if there's demand for it. Declarative supports hot reload, but that requires extra-code changes to make and programmatic is fixed at app load (and is finnicky with actors because of the activation requirement), so having an intentional opt-in at the actor instance level is the most straightforward route.

I'll modify the proposal itself to reflect this in the next day or two.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modified this proposal to drop all programmatic and declarative support in favor of only tackling an approach similar to streaming subscriptions.

@olitomlinson
Copy link

I cannot think of a PubSub provider that supports more than 32 consumer groups

PubSub components that don't have a concept of partitions, could theoretically have an unbounded amount of consumer groups.

But, I agree, consumer groups are not the right API to target specific Actors with.

@olitomlinson
Copy link

+1 for this alternate proposal for the BYO PubSub component aspect.

Although I don't fully understand the Subscription Manager concept. What I actually mean is I don't see how you intend to implement this. How do you go from a single PubSub message, to reliably delivering that to many (potentially unbounded amount of...) Actors, across N number of pods.

@WhitWaldo
Copy link
Author

+1 for this alternate proposal for the BYO PubSub component aspect.

Although I don't fully understand the Subscription Manager concept. What I actually mean is I don't see how you intend to implement this. How do you go from a single PubSub message, to reliably delivering that to many (potentially unbounded amount of...) Actors, across N number of pods.

The idea for the subscription manager is more just the notion that there's a dedicated capability in the runtime that's managing subscriptions (potentially for deduplication purposes (though that's fraught with concern for later use cases) or other optimization opportunities) and to provide a sample of what the schema for those objects looks like consistent with the CloudEvent subscription spec (as we're using CloudEvents anyway) - I'll update the proposal to reflect this preference in the next day or two.

Where today's PubSub leaves it to the subscriber to handle ack/drop/retry policy, I don't know how scalable that is here where you might have 10 or 10 million actor instances with separate subscriptions. My proposal instead suggests turning that on its head with regards to actors to say that Dapr itself receives the message and attempts to resolve somewhere to send it to (per any registered subscriptions to that component/topic and then of those, which match filters) and if it finds a destination, it acknowledges the message to the broker. If not, it signals a retry or drop based on policy. From there, handling resiliency between the runtime and the clients is up to resiliency policies set up.

…ent subscription filtering specification.

Added HTTP and updated gRPC specification for subscription management.

Signed-off-by: Whit Waldo <[email protected]>
…nly supporting streaming subscriptions in this initial release.

Signed-off-by: Whit Waldo <[email protected]>
@WhitWaldo
Copy link
Author

I've made some significant changes to the text of the proposal but centering around three themes that came up in the last two weeks of discussions about the idea:

  • Dropped initial support for declarative and programmatic subscriptions in favor of streaming subscriptions
  • Replaced the blend of CE and CEL filtering with a pure CE subscription spec implementation favoring the latest WIP and published CloudEvents specifications instead of having us reinvent the wheel with a funky hybrid.
  • The protos as originally provided didn't quite align with the concept as I imagined it, so I've added an HTTP specification and completely revamped the corresponding gRPC specification to go along with it (matching the new CE specification)

…low multiple sinks per subscription. That said, this does support bulk subscription deletion.

Signed-off-by: Whit Waldo <[email protected]>
@msfussell
Copy link
Member

I always find it useful if you can provide at least one SDK language (preferrably two) that show the end user code example. For example .NET and Python examples of how an Actor subscribes to a message. I realize that this is SDK dependent and may not be exactly write, but it gives the idea on how to use the Dapr runtime API.

@WhitWaldo
Copy link
Author

WhitWaldo commented Apr 11, 2025

I always find it useful if you can provide at least one SDK language (preferrably two) that show the end user code example. For example .NET and Python examples of how an Actor subscribes to a message. I realize that this is SDK dependent and may not be exactly write, but it gives the idea on how to use the Dapr runtime API.

That's sort of the beauty of this proposal, but one that Josh and I have been tackling offline is that there's potentially no changes needed to any of the actor implementations to support this because it's just like firing a timer - you're effectively specifying a method name that'll be invoked with the payload when a message matches up with that actor ID.

We've ruled out using something like [Topic] since we want greater flexibility to subscribe/unsubscribe from topics without necessitating code redeployments and, I just followed up with him about it, but a problem of opening gRPC directly to the actors is that it's not clear to me once hydrated, that the actors would ever enter a dormant state to get recycled because of that long-running connection. This rather favors the per-invocation approach used by timers.

The only thing you'd need is the mechanism for scheduling the workflow and in C#, that might look very similar to any other Dapr method:

public sealed record DaprPubSubSink(string ActorType, string ActorMethod, string? ActorId = null)
    {
        public override string ToString()
        {
            var value = $"{ActorType}/{ActorMethod}";
            if (ActorId is not null)
                value += $"/{ActorId}";
            return value;
        }
    }

public abstract Task<string> RegisterActorPubSubSubscriptionAsync(string pubsubName, string topicName, DaprPubSubSink sink, string? subscriptionId = null, IReadOnlyList<string>? types = null, IReadOnlyList<DaprSubscriptionFilter>? filters = null, CancellationToken cancellationToken = default);

@JoshVanL JoshVanL mentioned this pull request Apr 14, 2025
any subscribers. The publisher does not know who is subscribing to the message(s) and it's very possible that there's
more than one subscriber across actor types and instances allowing more than one actor to react to the same message.

This does not seek to implement actor message passing or fire-and-forget asynchronous communication. While valuable
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is not sending a message to an actor via a pubsub not deemed as asynchronous fire and forget? - as far as the publisher is concerned it is and it wont be blocked once the publish returns.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I was trying to get at in a larger sense is that this isn't seeking to enable a broad, developer-accessible fire and forget capability even if that's what is practically done in this circumstance.

// - not: The result of the filter expression is the inverse of the nested expression (e.g. if the expression evaluates as true, the result is false).
"filters": [
{"exact": {"type": "io.dapr.event.sent"}},
{"prefix": {"type": "io.dapr.event.", "subject": "<pubsub_topic"} }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing > "<pubsub_topic>"}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so - the topic is conveyed in the config in line 155.

// The last segment, "/<actor_id>" is optional and should only be present for a subscription that is specific to an actor instance
"sink": "https://dapr.io/events/<actor_type>/<actor_method>/<actor_id>",
// Required: The identifier of the delivery protocol used by Dapr
"protocol": "HTTP"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this now be gRPC ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debatable still on whether this is a long-running per-actor connection via gRPC or per-method HTTP invocation as triggered and I think that's up to @JoshVanL to determine based on runtime capability.

Signed-off-by: Whit Waldo <[email protected]>
JoshVanL added a commit to JoshVanL/dapr that referenced this pull request May 1, 2025
This change is the first in a series of refactors to the subscription
engine. This work is in preparation for adding Actor Subscriptions as
detailed [here](dapr/proposals#78). In order to
implement this new subscription type, the engine needs a hierarchical
reordering of the internal data structures in flow to make adding the
new implementation sane and maintainable.

This first change introduces a new `Postman` interface that is used to
send messages to subscription sinks- HTTP, gRPC or streaming channels.
Actors would be another sink added in future. Shared types have been
moved to a new `pkg/runtime/subscription/todo` package.

Appreciate that there will be further changes made to this code path.

Signed-off-by: joshvanl <[email protected]>
JoshVanL added a commit to JoshVanL/dapr that referenced this pull request May 1, 2025
This change is the first in a series of refactors to the subscription
engine. This work is in preparation for adding Actor Subscriptions as
detailed [here](dapr/proposals#78). In order to
implement this new subscription type, the engine needs a hierarchical
reordering of the internal data structures in flow to make adding the
new implementation sane and maintainable.

The existing implementation on subscription message processing and
routing to various sinks are desperate in the codebase and somewhat
spaghetti.

This first change introduces a new `Postman` interface that is used to
send messages to subscription sinks- HTTP, gRPC or streaming channels.
Actors would be another sink added in future. Shared types have been
moved to a new `pkg/runtime/subscription/todo` package.

Appreciate that there will be further changes made to this code path.

Signed-off-by: joshvanl <[email protected]>
dapr-bot added a commit to dapr/dapr that referenced this pull request Jun 3, 2025
* runtime: subscription engine refactor [1]

This change is the first in a series of refactors to the subscription
engine. This work is in preparation for adding Actor Subscriptions as
detailed [here](dapr/proposals#78). In order to
implement this new subscription type, the engine needs a hierarchical
reordering of the internal data structures in flow to make adding the
new implementation sane and maintainable.

The existing implementation on subscription message processing and
routing to various sinks are desperate in the codebase and somewhat
spaghetti.

This first change introduces a new `Postman` interface that is used to
send messages to subscription sinks- HTTP, gRPC or streaming channels.
Actors would be another sink added in future. Shared types have been
moved to a new `pkg/runtime/subscription/todo` package.

Appreciate that there will be further changes made to this code path.

Signed-off-by: joshvanl <[email protected]>

* Deliverer -> Deliver

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
Co-authored-by: Dapr Bot <[email protected]>
@WhitWaldo
Copy link
Author

On the release call this week, we elected to push this back to the 1.17 release so it can get the development time it deserves and I agree with this decision.

@WhitWaldo WhitWaldo mentioned this pull request Jun 24, 2025
7 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants