-
Notifications
You must be signed in to change notification settings - Fork 2.2k
actor: add new package for structured concurrency based on the Actor model #9820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Important Review skippedAuto reviews are limited to specific labels. 🏷️ Labels to auto review (1)
Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
Pull reviewers statsStats of the last 30 days for lnd:
|
fdb6a7e to
9319e4c
Compare
|
Used this recently during the PB Hackathon. It worked pretty well. Decomposing the pipeline into dedicated actors allowed for incremental implementation during the time crunch (24 hr hackathon, but we made our solution in ~6 of actual coding). Only one instance of each actor was present in the final solution, but we could easily scale out the any of them (in particular the Here's a diagram describing the final architecture: flowchart LR
CLI(User/CLI) --> Engine(FuzzEngine)
Engine -- "1\. Starts" --> Controller(FuzzControllerActor)
subgraph Phase1["Phase 1: Initial Generation"]
Controller -- "2\. Send Program" --> Writer(FuzzWriterActor)
Writer -- "3\. Request" --> ChatActor(ChatActor)
ChatActor -- "4\. Generate" --> Writer
Writer -- "5\. Provide Test" --> Controller
end
subgraph Phase2["Phase 2: Iterative Loop"]
Controller -- "6\. Execute" --> Executor(FuzzExecutorActor)
Executor -- "7\. Return Results" --> Controller
Controller -- "8\. Analyze" --> Optimizer(FuzzOptimizerActor)
Optimizer -- "9\. Request" --> ChatActor
ChatActor -- "10\. Improve" --> Optimizer
Optimizer -- "11\. Return" --> Controller
Controller -- "12\. Repeat" --> Executor
end
Controller -- "13\. Finalize" --> Report(Final Report)
One thing missing (especially important for the distributed case) is any sort of active queue management. Right now we'll just block forever, or until the context gets cancelled. We could update this to drop the message down to a |
In this commit, we add two new fundamental data structures: Future[T] and Promise[T]. A future is a response that might be ready at some point in the future. This is already a common pattern in Go, we just make a type safe wrapper around the typical operations: block w/ a timeout, add a call back for execution, pipeline the response to a new future. A promise is an intent to complete a future. Typically the caller receives the future, and the callee is able to complete the future using a promise.
In this commit, we add the actual Actor implementation. We define a series of types and interfaces, that in concert, describe our actor. An actor has some ID, a reference (used to send messages to it), and also a set of defined messages that it'll accept. An actor can be implemented using a simple function if it's stateless. Otherwise, a struct can implement the Receive method, and handle its internal message passing and state that way.
In this commit, we add the actor system (along with the receiptionist) and the router. An actor can be registered with the system, which allows other callers to locate it to send message to it via the receptionist. Custom routers can be created for when there're actors that rely on the same service key and also req+resp type. This can be used to implement something similar to a worker pool.
In this commit, we add a series of examples that show how the package can be used in the wild. They can be run as normal Example tests.
In this commit, we add a readme which serves as a general introduction to the pacakge, and also the motivation of the package. It serves as a manual for developers that may wish to interact with the package.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new actor package, which provides a well-designed and robust implementation of the Actor model for structured concurrency in Go. The code is of high quality, well-documented, and thoroughly tested. The abstractions like Actor, ActorRef, Future, and ActorSystem are clear and follow modern Go practices with generics. My review found a couple of minor issues: one in the documentation and a potential panic in the actor system logic under very specific, though unlikely, conditions. Overall, this is an excellent addition to the codebase.
|
|
||
| // Build a new slice containing only the references that are not the one | ||
| // to be removed. | ||
| newRefs := make([]any, 0, len(refs)-1) // Pre-allocate assuming one removal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential for a panic here. If len(refs) is 0, len(refs)-1 will be -1, causing make to panic. While the current logic seems to prevent refs from being an empty slice, this assumption is brittle and could be broken by future changes. Using max(0, ...) would make this more robust.
| newRefs := make([]any, 0, len(refs)-1) // Pre-allocate assuming one removal | |
| newRefs := make([]any, 0, max(0, len(refs)-1)) // Pre-allocate assuming one removal |
| Concurrency is managed by the actor system, allowing many actors to | ||
| This model inherently promotes loose coupling, as actors do not share state | ||
| execute concurrently without explicit lock management by the developer for actor | ||
| state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sentence appears to be garbled, likely due to a copy-paste or merge error. It seems to combine two separate ideas. For clarity, it should be split into two distinct sentences.
| Concurrency is managed by the actor system, allowing many actors to | |
| This model inherently promotes loose coupling, as actors do not share state | |
| execute concurrently without explicit lock management by the developer for actor | |
| state. | |
| Concurrency is managed by the actor system, allowing many actors to | |
| execute concurrently without explicit lock management by the developer for | |
| actor state. This model inherently promotes loose coupling, as actors do not | |
| share state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I think the sentence is unfinished.
ziggie1984
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks superb 🤩,
First pass done, implementation is super clean.
Will do a second pass - but I think we can land this quite fast.
| @@ -0,0 +1,17 @@ | |||
| module github.com/lightningnetwork/lnd/actor | |||
|
|
|||
| go 1.23.6 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Bump to 1.24.9
| close(p.fut.done) | ||
| success = true | ||
| }) | ||
| return success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: new line before return.
| // executed in a new goroutine, so it does not block the completion path of the | ||
| // original future. | ||
| func (f *futureImpl[T]) OnComplete(ctx context.Context, cFunc func(fn.Result[T])) { | ||
| go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to add a waitgroup to properly track this goroutines (same in ThenApply) ? Or is the context enough as a guard to not have hanging goroutines ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would listen on the wait group done?
| result1 := fut1.Await(ctx1) | ||
| if !result1.IsErr() || | ||
| !errors.Is(result1.Err(), context.Canceled) { | ||
| t.Fatalf("await with immediate cancel: expected "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use require.ErrorsIs() .... + Newline above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use require instead of fatal in all cases here.
|
|
||
| // messageMarker implements the unexported method for the Message interface, | ||
| // allowing types that embed BaseMessage to satisfy the Message interface. | ||
| func (BaseMessage) messageMarker() {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the messageMarker good for and should it normally be implemented from the outside system ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah basically it "seals" the interface/struct so you can only make new instances in this package.
In practice to mark a struct as an actor message by embedding this interface/struct in it.
| mu sync.RWMutex | ||
|
|
||
| // ctx is the main context for the actor system. | ||
| ctx context.Context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to have a method to return the context of the system ? So it can be used maybe for external func calls ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have an example of that? I've used a copy of this in a few other project since this PR was put up and haven' t had such a need yet myself.
| // returned, providing type safety. | ||
| func FindInReceptionist[M Message, R any]( | ||
| r *Receptionist, key ServiceKey[M, R]) []ActorRef[M, R] { | ||
| r.mu.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New line above + function gofmt
Abdulkbk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good and the examples provided are very helpful. Reviewed lightlly to understand how it could fit in #10089
| // messages to the actor using only the "tell" pattern (fire-and-forget), | ||
| // without having access to "ask" capabilities. | ||
| func (a *Actor[M, R]) TellRef() TellOnlyRef[M] { | ||
| return a.ref |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a.ref returns the ActorRef which has access to Ask, contrary to what the godoc says.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's true, but as we return the TellOnlyRef interface, a caller would need to do a type assert (that might fail) to access the Ask.
| // DefaultConfig returns a default configuration for the ActorSystem. | ||
| func DefaultConfig() SystemConfig { | ||
| return SystemConfig{ | ||
| MailboxCapacity: 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to explain what 100 means in terms of resources? For example, in places where we do caching, we usually explain the memory requirements. That way, users know what tweaking the default actually means.
In this PR, we add a new package that implements structured concurrency patterns using the actor model. The README should give a good overview of how the API works, and the major abstractions at play. I'll focus this PR body on the motivation behind introducing such a model, and some of my goals.
Motivation
While working on the new RBF close based channel, I ran into a bug when I went to test the restart scenario.
Typically, the RPC server will contact the switch to do a coop close. The switch holds the link, which has a call back passed into to trigger a coop close via the peer. At the end of this series of calls, we create a new chan closer which uses the underlying channel, and also the new RBF state machine to drive the coop close process:
lnd/rpcserver.go
Lines 2811 to 2831 in 3707b1f
Unlike the existing chan closer, after a restart, the rbf chan closer is free to trigger additional fee updates. However, as we don't load non active channels back into the switch, the RPC server wasn't able to rely on the switch to send the messages it needed.
To get around this, I had to add 3 new methods to do the pointer chasing of: rpcServer -> server -> peer -> active close map -> rbf closer:
lnd/server.go
Lines 5521 to 5548 in 3707b1f
lnd/server.go
Lines 5482 to 5519 in 3707b1f
lnd/peer/brontide.go
Lines 5355 to 5392 in 3707b1f
Across the codebase, this is a common pattern wherein we'll go from the
rpcServerto theserver, which is effectively a "god struct" that contains pointers to all the other sub-systems that we may need to access. In the case of the version before this PR, as the life cycle of some sub-systems shifted, we had to unravel a few abstractions to be able to send a message to the sub-system at play. This requires quite a bit of knowledge on the behalf of the RPC server: it needs to know which sub-systems manage others, their life cycles, how they're created, the methods to call, etc, etc.The Actor Solution
As we'll see in a follow up PR, the new
actorpackage lets the peer expose a new actor that can handle the RPC specific logic for the close bump process. So instead of doing this pointer chasing thru the god struct, which adds a degree of tight coupling, the rpcserver just needs to create theServiceKeythat it advertised at the package level. It can then use this to send a message directly to the new rbf close actor.This achieves a greater degree of loose coupling, and the abstractions lend well to experimentation and composition of various actors.
Broadly in the codebase, we already implement message passing between event loops managed by goroutines where the state is a local variable. This package codifies that pattern, and creates a more standardized way of allowing these event loops to interact with each other. It also provides more flexibility, as the sub-system boundaries are based on messages instead of methods.
Future[T]IMO the
Future[T]abstraction added in this PR is also a very nice abstraction that wraps the typical send/recv timeout behavior we have elsewhere across the codebase. Instead of directly returning the channel (allows for anti-patterns such as blocking forever w/ no context), we can returnFuture[T]in place.