Skip to content

Conversation

@Roasbeef
Copy link
Member

@Roasbeef Roasbeef commented May 17, 2025

In this PR, we add a new package that implements structured concurrency patterns using the actor model. The README should give a good overview of how the API works, and the major abstractions at play. I'll focus this PR body on the motivation behind introducing such a model, and some of my goals.

Motivation

While working on the new RBF close based channel, I ran into a bug when I went to test the restart scenario.

Typically, the RPC server will contact the switch to do a coop close. The switch holds the link, which has a call back passed into to trigger a coop close via the peer. At the end of this series of calls, we create a new chan closer which uses the underlying channel, and also the new RBF state machine to drive the coop close process:

lnd/rpcserver.go

Lines 2811 to 2831 in 3707b1f

// If the link is not known by the switch, we cannot gracefully close
// the channel.
channelID := lnwire.NewChanIDFromOutPoint(*chanPoint)
if _, err := r.server.htlcSwitch.GetLink(channelID); err != nil {
chanInSwitch = false
// The channel isn't in the switch, but if there's an
// active chan closer for the channel, and it's of the
// RBF variant, then we can actually bypass the switch.
// Otherwise, we'll return an error.
if !chanHasRbfCloser {
rpcsLog.Debugf("Trying to non-force close "+
"offline channel with chan_point=%v",
chanPoint)
return fmt.Errorf("unable to gracefully close "+
"channel while peer is offline (try "+
"force closing it instead): %v", err)
}
}

Unlike the existing chan closer, after a restart, the rbf chan closer is free to trigger additional fee updates. However, as we don't load non active channels back into the switch, the RPC server wasn't able to rely on the switch to send the messages it needed.

To get around this, I had to add 3 new methods to do the pointer chasing of: rpcServer -> server -> peer -> active close map -> rbf closer:

lnd/server.go

Lines 5521 to 5548 in 3707b1f

// AttemptRBFCloseUpdate attempts to trigger a new RBF iteration for a co-op
// close update. This route it to be used only if the target channel in question
// is no longer active in the link. This can happen when we restart while we
// already have done a single RBF co-op close iteration.
func (s *server) AttemptRBFCloseUpdate(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) {
// If the channel is present in the switch, then the request should flow
// through the switch instead.
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
if _, err := s.htlcSwitch.GetLink(chanID); err == nil {
return nil, fmt.Errorf("ChannelPoint(%v) is active in link, "+
"invalid request", chanPoint)
}
// At this point, we know that the channel isn't present in the link, so
// we'll check to see if we have an entry in the active chan closer map.
updates, err := s.attemptCoopRbfFeeBump(
ctx, chanPoint, feeRate, deliveryScript,
)
if err != nil {
return nil, fmt.Errorf("unable to attempt coop rbf fee bump "+
"ChannelPoint(%v)", chanPoint)
}
return updates, nil
}

lnd/server.go

Lines 5482 to 5519 in 3707b1f

// attemptCoopRbfFeeBump attempts to look up the active chan closer for a
// channel given the outpoint. If found, we'll attempt to do a fee bump,
// returning channels used for updates. If the channel isn't currently active
// (p2p connection established), then his function will return an error.
func (s *server) attemptCoopRbfFeeBump(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*peer.CoopCloseUpdates, error) {
// First, we'll attempt to look up the channel based on it's
// ChannelPoint.
channel, err := s.chanStateDB.FetchChannel(chanPoint)
if err != nil {
return nil, fmt.Errorf("unable to fetch channel: %w", err)
}
// From the channel, we can now get the pubkey of the peer, then use
// that to eventually get the chan closer.
peerPub := channel.IdentityPub.SerializeCompressed()
// Now that we have the peer pub, we can look up the peer itself.
s.mu.RLock()
targetPeer, ok := s.peersByPub[string(peerPub)]
s.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("peer for ChannelPoint(%v) is "+
"not online", chanPoint)
}
closeUpdates, err := targetPeer.TriggerCoopCloseRbfBump(
ctx, chanPoint, feeRate, deliveryScript,
)
if err != nil {
return nil, fmt.Errorf("unable to trigger coop rbf fee bump: "+
"%w", err)
}
return closeUpdates, nil
}

lnd/peer/brontide.go

Lines 5355 to 5392 in 3707b1f

// TriggerCoopCloseRbfBump given a chan ID, and the params needed to trigger a
// new RBF co-op close update, a bump is attempted. A channel used for updates,
// along with one used to o=communicate any errors is returned. If no chan
// closer is found, then false is returned for the second argument.
func (p *Brontide) TriggerCoopCloseRbfBump(ctx context.Context,
chanPoint wire.OutPoint, feeRate chainfee.SatPerKWeight,
deliveryScript lnwire.DeliveryAddress) (*CoopCloseUpdates, error) {
// If RBF coop close isn't permitted, then we'll an error.
if !p.rbfCoopCloseAllowed() {
return nil, fmt.Errorf("rbf coop close not enabled for " +
"channel")
}
closeUpdates := &CoopCloseUpdates{
UpdateChan: make(chan interface{}, 1),
ErrChan: make(chan error, 1),
}
// We'll re-use the existing switch struct here, even though we're
// bypassing the switch entirely.
closeReq := htlcswitch.ChanClose{
CloseType: contractcourt.CloseRegular,
ChanPoint: &chanPoint,
TargetFeePerKw: feeRate,
DeliveryScript: deliveryScript,
Updates: closeUpdates.UpdateChan,
Err: closeUpdates.ErrChan,
Ctx: ctx,
}
err := p.startRbfChanCloser(newRPCShutdownInit(&closeReq), chanPoint)
if err != nil {
return nil, err
}
return closeUpdates, nil
}


Across the codebase, this is a common pattern wherein we'll go from the rpcServer to the server, which is effectively a "god struct" that contains pointers to all the other sub-systems that we may need to access. In the case of the version before this PR, as the life cycle of some sub-systems shifted, we had to unravel a few abstractions to be able to send a message to the sub-system at play. This requires quite a bit of knowledge on the behalf of the RPC server: it needs to know which sub-systems manage others, their life cycles, how they're created, the methods to call, etc, etc.

The Actor Solution

As we'll see in a follow up PR, the new actor package lets the peer expose a new actor that can handle the RPC specific logic for the close bump process. So instead of doing this pointer chasing thru the god struct, which adds a degree of tight coupling, the rpcserver just needs to create the ServiceKey that it advertised at the package level. It can then use this to send a message directly to the new rbf close actor.

This achieves a greater degree of loose coupling, and the abstractions lend well to experimentation and composition of various actors.

Broadly in the codebase, we already implement message passing between event loops managed by goroutines where the state is a local variable. This package codifies that pattern, and creates a more standardized way of allowing these event loops to interact with each other. It also provides more flexibility, as the sub-system boundaries are based on messages instead of methods.

Future[T]

IMO the Future[T] abstraction added in this PR is also a very nice abstraction that wraps the typical send/recv timeout behavior we have elsewhere across the codebase. Instead of directly returning the channel (allows for anti-patterns such as blocking forever w/ no context), we can return Future[T] in place.

@Roasbeef Roasbeef added brainstorming Long term ideas/discussion/requests for feedback code health Related to code commenting, refactoring, and other non-behaviour improvements architecture Related to system design labels May 17, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented May 17, 2025

Important

Review skipped

Auto reviews are limited to specific labels.

🏷️ Labels to auto review (1)
  • llm-review

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions
Copy link

Pull reviewers stats

Stats of the last 30 days for lnd:

User Total reviews Time to review Total comments
guggero
🥇
24
▀▀▀
23h 49m
47
▀▀▀
ziggie1984
🥈
13
12h 35m
35
▀▀
bhandras
🥉
11
4h 34m
12
yyforyongyu
10
1d 3h 57m
16
Roasbeef
7
9h 39m
4
ellemouton
5
1d 6h 18m
5
bitromortac
5
1h 41m
6
morehouse
3
1d 1h 19m
3
ffranr
2
18m
0
mohamedawnallah
2
6d 14h 50m
▀▀
11
NishantBansal2003
2
5d 15h 32m
▀▀
0
sputn1ck
1
23h 39m
2
GeorgeTsagk
1
3d 36m
0
saubyk
1
20h 37m
0
MPins
1
8d 14h 1m
▀▀▀
3

@Roasbeef
Copy link
Member Author

Roasbeef commented May 20, 2025

Used this recently during the PB Hackathon. It worked pretty well. Decomposing the pipeline into dedicated actors allowed for incremental implementation during the time crunch (24 hr hackathon, but we made our solution in ~6 of actual coding). Only one instance of each actor was present in the final solution, but we could easily scale out the any of them (in particular the ChatActor and OptimizerActor) to increase parallelism. If we were to modify the FuzzExecutorActor to run instead using something like managed docker containers, the end messages say the same so no updates are needed elsewhere.

Here's a diagram describing the final architecture:

flowchart LR
    CLI(User/CLI) --> Engine(FuzzEngine)
    Engine -- "1\. Starts" --> Controller(FuzzControllerActor)
    
    subgraph Phase1["Phase 1: Initial Generation"]
        Controller -- "2\. Send Program" --> Writer(FuzzWriterActor)
        Writer -- "3\. Request" --> ChatActor(ChatActor)
        ChatActor -- "4\. Generate" --> Writer
        Writer -- "5\. Provide Test" --> Controller
    end
    
    subgraph Phase2["Phase 2: Iterative Loop"]
        Controller -- "6\. Execute" --> Executor(FuzzExecutorActor)
        Executor -- "7\. Return Results" --> Controller
        Controller -- "8\. Analyze" --> Optimizer(FuzzOptimizerActor)
        Optimizer -- "9\. Request" --> ChatActor
        ChatActor -- "10\. Improve" --> Optimizer
        Optimizer -- "11\. Return" --> Controller
        Controller -- "12\. Repeat" --> Executor
    end
    
    Controller -- "13\. Finalize" --> Report(Final Report)
Loading

One thing missing (especially important for the distributed case) is any sort of active queue management. Right now we'll just block forever, or until the context gets cancelled. We could update this to drop the message down to a RetryActor that'll try to deliver the message until restart. Ofc, we can also just rely on the caller to handle that themselves.

In this commit, we add two new fundamental data structures: Future[T]
and Promise[T].

A future is a response that might be ready at some point in the future.
This is already a common pattern in Go, we just make a type safe wrapper
around the typical operations: block w/ a timeout, add a call back for
execution, pipeline the response to a new future.

A promise is an intent to complete a future. Typically the caller
receives the future, and the callee is able to complete the future using
a promise.
In this commit, we add the actual Actor implementation. We define a
series of types and interfaces, that in concert, describe our actor. An
actor has some ID, a reference (used to send messages to it), and also a
set of defined messages that it'll accept.

An actor can be implemented using a simple function if it's stateless.
Otherwise, a struct can implement the Receive method, and handle its
internal message passing and state that way.
In this commit, we add the actor system (along with the receiptionist)
and the router.

An actor can be registered with the system, which allows other callers
to locate it to send message to it via the receptionist. Custom routers
can be created for when there're actors that rely on the same service
key and also req+resp type. This can be used to implement something
similar to a worker pool.
In this commit, we add a series of examples that show how the package
can be used in the wild. They can be run as normal Example tests.
In this commit, we add a readme which serves as a general introduction
to the pacakge, and also the motivation of the package. It serves as a
manual for developers that may wish to interact with the package.
@saubyk saubyk added this to v0.21 Nov 6, 2025
@saubyk saubyk added this to the v0.21.0 milestone Nov 6, 2025
@saubyk saubyk moved this to In review in v0.21 Nov 6, 2025
@ziggie1984
Copy link
Collaborator

/gemini review

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new actor package, which provides a well-designed and robust implementation of the Actor model for structured concurrency in Go. The code is of high quality, well-documented, and thoroughly tested. The abstractions like Actor, ActorRef, Future, and ActorSystem are clear and follow modern Go practices with generics. My review found a couple of minor issues: one in the documentation and a potential panic in the actor system logic under very specific, though unlikely, conditions. Overall, this is an excellent addition to the codebase.


// Build a new slice containing only the references that are not the one
// to be removed.
newRefs := make([]any, 0, len(refs)-1) // Pre-allocate assuming one removal

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's a potential for a panic here. If len(refs) is 0, len(refs)-1 will be -1, causing make to panic. While the current logic seems to prevent refs from being an empty slice, this assumption is brittle and could be broken by future changes. Using max(0, ...) would make this more robust.

Suggested change
newRefs := make([]any, 0, len(refs)-1) // Pre-allocate assuming one removal
newRefs := make([]any, 0, max(0, len(refs)-1)) // Pre-allocate assuming one removal

Comment on lines +24 to +27
Concurrency is managed by the actor system, allowing many actors to
This model inherently promotes loose coupling, as actors do not share state
execute concurrently without explicit lock management by the developer for actor
state.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This sentence appears to be garbled, likely due to a copy-paste or merge error. It seems to combine two separate ideas. For clarity, it should be split into two distinct sentences.

Suggested change
Concurrency is managed by the actor system, allowing many actors to
This model inherently promotes loose coupling, as actors do not share state
execute concurrently without explicit lock management by the developer for actor
state.
Concurrency is managed by the actor system, allowing many actors to
execute concurrently without explicit lock management by the developer for
actor state. This model inherently promotes loose coupling, as actors do not
share state.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1
I think the sentence is unfinished.

Copy link
Collaborator

@ziggie1984 ziggie1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks superb 🤩,

First pass done, implementation is super clean.

Will do a second pass - but I think we can land this quite fast.

@@ -0,0 +1,17 @@
module github.com/lightningnetwork/lnd/actor

go 1.23.6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Bump to 1.24.9

close(p.fut.done)
success = true
})
return success
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: new line before return.

// executed in a new goroutine, so it does not block the completion path of the
// original future.
func (f *futureImpl[T]) OnComplete(ctx context.Context, cFunc func(fn.Result[T])) {
go func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to add a waitgroup to properly track this goroutines (same in ThenApply) ? Or is the context enough as a guard to not have hanging goroutines ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would listen on the wait group done?

result1 := fut1.Await(ctx1)
if !result1.IsErr() ||
!errors.Is(result1.Err(), context.Canceled) {
t.Fatalf("await with immediate cancel: expected "+
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use require.ErrorsIs() .... + Newline above

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use require instead of fatal in all cases here.


// messageMarker implements the unexported method for the Message interface,
// allowing types that embed BaseMessage to satisfy the Message interface.
func (BaseMessage) messageMarker() {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the messageMarker good for and should it normally be implemented from the outside system ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah basically it "seals" the interface/struct so you can only make new instances in this package.

In practice to mark a struct as an actor message by embedding this interface/struct in it.

mu sync.RWMutex

// ctx is the main context for the actor system.
ctx context.Context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to have a method to return the context of the system ? So it can be used maybe for external func calls ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have an example of that? I've used a copy of this in a few other project since this PR was put up and haven' t had such a need yet myself.

// returned, providing type safety.
func FindInReceptionist[M Message, R any](
r *Receptionist, key ServiceKey[M, R]) []ActorRef[M, R] {
r.mu.RLock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New line above + function gofmt

@gijswijs gijswijs modified the milestones: v0.21.0, v0.20.0 Nov 12, 2025
Copy link
Contributor

@Abdulkbk Abdulkbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really good and the examples provided are very helpful. Reviewed lightlly to understand how it could fit in #10089

// messages to the actor using only the "tell" pattern (fire-and-forget),
// without having access to "ask" capabilities.
func (a *Actor[M, R]) TellRef() TellOnlyRef[M] {
return a.ref
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a.ref returns the ActorRef which has access to Ask, contrary to what the godoc says.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's true, but as we return the TellOnlyRef interface, a caller would need to do a type assert (that might fail) to access the Ask.

// DefaultConfig returns a default configuration for the ActorSystem.
func DefaultConfig() SystemConfig {
return SystemConfig{
MailboxCapacity: 100,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to explain what 100 means in terms of resources? For example, in places where we do caching, we usually explain the memory requirements. That way, users know what tweaking the default actually means.

@lightninglabs-deploy
Copy link

@gijswijs: review reminder
@Roasbeef, remember to re-request review from reviewers when ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

architecture Related to system design brainstorming Long term ideas/discussion/requests for feedback code health Related to code commenting, refactoring, and other non-behaviour improvements taproot

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

8 participants