Skip to content

Commit

Permalink
Expose node component management
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik committed Dec 11, 2024
1 parent 4c8ac17 commit 8b0dff9
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 87 deletions.
64 changes: 50 additions & 14 deletions cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,53 @@ type Node interface {
// The Run function starts all the components, and is blocked until either a termination
// signal is received or a irrecoverable error is encountered.
type FlowNodeImp struct {
component.Component
NodeImp
*NodeConfig
}

// NodeImp can be used to create a node instance from:
// - a logger: to be used during startup and shutdown
// - a component: that will be started with Run
// - a cleanup function: that will be called after the component has been stopped
// - a fatal error handler: to handle any error received from the component
type NodeImp struct {
component.Component
logger zerolog.Logger
postShutdown func() error
fatalHandler func(error)
}

// NewNode returns a new node instance
func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logger, cleanup func() error, handleFatal func(error)) Node {
func NewNode(
component component.Component,
cfg *NodeConfig,
logger zerolog.Logger,
cleanup func() error,
handleFatal func(error),
) Node {
return &FlowNodeImp{
NodeConfig: cfg,
NodeImp: NewBaseNode(
component,
logger.With().
Str("node_role", cfg.BaseConfig.NodeRole).
Hex("spork_id", logging.ID(cfg.SporkID)).
Logger(),
cleanup,
handleFatal,
),
}
}

// NewBaseNode returns a new base node instance
func NewBaseNode(
component component.Component,
logger zerolog.Logger,
cleanup func() error,
handleFatal func(error),
) NodeImp {
return NodeImp{
Component: component,
NodeConfig: cfg,
logger: logger,
postShutdown: cleanup,
fatalHandler: handleFatal,
Expand All @@ -51,13 +86,11 @@ func NewNode(component component.Component, cfg *NodeConfig, logger zerolog.Logg
// which point it gracefully shuts down.
// Any unhandled irrecoverable errors thrown in child components will propagate up to here and
// result in a fatal error.
func (node *FlowNodeImp) Run() {
// Cancelling this context notifies all child components that it's time to shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func (node *NodeImp) Run() {
ctx := context.Background()

// Block until node is shutting down
err := node.run(ctx, cancel)
err := node.run(ctx)

// Any error received is considered fatal.
if err != nil {
Expand All @@ -73,14 +106,18 @@ func (node *FlowNodeImp) Run() {
node.logger.Error().Err(err).Msg("error encountered during cleanup")
}

node.logger.Info().Msgf("%s node shutdown complete", node.BaseConfig.NodeRole)
node.logger.Info().Msg("node shutdown complete")
}

// run starts the node and blocks until a SIGINT/SIGTERM is received or an error is encountered.
// It returns:
// - nil if a termination signal is received, and all components have been gracefully stopped.
// - error if a irrecoverable error is received
func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) error {
// - error if an irrecoverable error is received
func (node *NodeImp) run(ctx context.Context) error {
// Cancelling this context notifies all child components that it's time to shut down
ctx, shutdown := context.WithCancel(ctx)
defer shutdown()

// Components will pass unhandled irrecoverable errors to this channel via signalerCtx (or a
// child context). Any errors received on this channel should halt the node.
signalerCtx, errChan := irrecoverable.WithSignaler(ctx)
Expand All @@ -97,8 +134,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e
select {
case <-node.Ready():
node.logger.Info().
Hex("spork_id", logging.ID(node.SporkID)).
Msgf("%s node startup complete", node.BaseConfig.NodeRole)
Msg("node startup complete")
case <-ctx.Done():
}
}()
Expand All @@ -118,7 +154,7 @@ func (node *FlowNodeImp) run(ctx context.Context, shutdown context.CancelFunc) e

// 3: Shut down
// Send shutdown signal to components
node.logger.Info().Msgf("%s node shutting down", node.BaseConfig.NodeRole)
node.logger.Info().Msg("node shutting down")
shutdown()

// Block here until all components have stopped or an irrecoverable error is received.
Expand Down
17 changes: 10 additions & 7 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ import (
const NotSet = "not set"

type BuilderFunc func(nodeConfig *NodeConfig) error
type ReadyDoneFactory func(node *NodeConfig) (module.ReadyDoneAware, error)

// ReadyDoneFactory is a function that returns a ReadyDoneAware component or an error if
// the factory cannot create the component
type ReadyDoneFactory[Input any] func(input Input) (module.ReadyDoneAware, error)

// NodeBuilder declares the initialization methods needed to bootstrap up a Flow node
type NodeBuilder interface {
Expand Down Expand Up @@ -73,7 +76,7 @@ type NodeBuilder interface {
// The ReadyDoneFactory may return either a `Component` or `ReadyDoneAware` instance.
// In both cases, the object is started according to its interface when the node is run,
// and the node will wait for the component to exit gracefully.
Component(name string, f ReadyDoneFactory) NodeBuilder
Component(name string, f ReadyDoneFactory[*NodeConfig]) NodeBuilder

// DependableComponent adds a new component to the node that conforms to the ReadyDoneAware
// interface. The builder will wait until all of the components in the dependencies list are ready
Expand All @@ -86,15 +89,15 @@ type NodeBuilder interface {
// IMPORTANT: Dependable components are started in parallel with no guaranteed run order, so all
// dependencies must be initialized outside of the ReadyDoneFactory, and their `Ready()` method
// MUST be idempotent.
DependableComponent(name string, f ReadyDoneFactory, dependencies *DependencyList) NodeBuilder
DependableComponent(name string, f ReadyDoneFactory[*NodeConfig], dependencies *DependencyList) NodeBuilder

// RestartableComponent adds a new component to the node that conforms to the ReadyDoneAware
// interface, and calls the provided error handler when an irrecoverable error is encountered.
// Use RestartableComponent if the component is not critical to the node's safe operation and
// can/should be independently restarted when an irrecoverable error is encountered.
//
// Any irrecoverable errors thrown by the component will be passed to the provided error handler.
RestartableComponent(name string, f ReadyDoneFactory, errorHandler component.OnError) NodeBuilder
RestartableComponent(name string, f ReadyDoneFactory[*NodeConfig], errorHandler component.OnError) NodeBuilder

// ShutdownFunc adds a callback function that is called after all components have exited.
// All shutdown functions are called regardless of errors returned by previous callbacks. Any
Expand Down Expand Up @@ -299,16 +302,16 @@ func DefaultBaseConfig() *BaseConfig {
// DependencyList is a slice of ReadyDoneAware implementations that are used by DependableComponent
// to define the list of dependencies that must be ready before starting the component.
type DependencyList struct {
components []module.ReadyDoneAware
Components []module.ReadyDoneAware
}

func NewDependencyList(components ...module.ReadyDoneAware) *DependencyList {
return &DependencyList{
components: components,
Components: components,
}
}

// Add adds a new ReadyDoneAware implementation to the list of dependencies.
func (d *DependencyList) Add(component module.ReadyDoneAware) {
d.components = append(d.components, component)
d.Components = append(d.Components, component)
}
Loading

0 comments on commit 8b0dff9

Please sign in to comment.