diff --git a/pkg/engine/internal/scheduler/wire/frame.go b/pkg/engine/internal/scheduler/wire/frame.go new file mode 100644 index 0000000000000..1687780a4169f --- /dev/null +++ b/pkg/engine/internal/scheduler/wire/frame.go @@ -0,0 +1,98 @@ +package wire + +import ( + "fmt" +) + +// FrameKind represents the type of a Frame. +type FrameKind int + +const ( + // FrameKindInvalid represents an invalid frame. + FrameKindInvalid FrameKind = iota + + FrameKindMessage // FrameKindMessage is used for [MessageFrame]. + FrameKindAck // FrameKindAck is used for [AckFrame]. + FrameKindNack // FrameKindNack is used for [NackFrame]. + FrameKindDiscard // FrameKindDiscard is used for [DiscardFrame]. +) + +var frameKindNames = [...]string{ + FrameKindInvalid: "FrameKindInvalid", + FrameKindMessage: "FrameKindMessage", + FrameKindAck: "FrameKindAck", + FrameKindNack: "FrameKindNack", + FrameKindDiscard: "FrameKindDiscard", +} + +// String returns a string representation of k. +func (k FrameKind) String() string { + if k < 0 || int(k) >= len(frameKindNames) { + return fmt.Sprintf("FrameKind(%d)", k) + } + return frameKindNames[k] +} + +// A Frame is the lowest level of communication between two peers. Frames are an +// envelope for messages between peers. +type Frame interface { + isFrame() + FrameKind() FrameKind +} + +// MessageFrame is a Frame that sends a [Message] to the peer. MessageFrames are +// paired with an [AckFrame] to acknowledge that the message has been +// successfully processed, or [NackFrame] in case of failure. +type MessageFrame struct { + // ID uniquely identifies the message. It is up to the sender to ensure that + // IDs are unique within a stream. + ID uint64 + + // Message being sent to the peer. + Message Message +} + +// FrameKind returns [FrameKindMessage]. +func (m MessageFrame) FrameKind() FrameKind { return FrameKindMessage } + +// AckFrame is a Frame that acknowledges a [MessageFrame] was processed +// successfully. +type AckFrame struct { + // ID of the [MessageFrame] being acknowledged. + ID uint64 +} + +// FrameKind returns [FrameKindAck]. +func (a AckFrame) FrameKind() FrameKind { return FrameKindAck } + +// NackFrame is a Frame that notifies that a [MessageFrame] could not be +// processed. +type NackFrame struct { + // ID of the [MessageFrame] being negatively acknowledged. + ID uint64 + + // Error is the error that occurred. + Error error +} + +// FrameKind returns [FrameKindNack]. +func (n NackFrame) FrameKind() FrameKind { return FrameKindNack } + +// DiscardFrame is a Frame that informs the peer that a [MessageFrame] has +// been discarded and an acknowledgement is no longer needed. +// +// A peer receiving a DiscardFrame should produce no acknowledgement. +type DiscardFrame struct { + // ID of the [MessageFrame] being discarded. + ID uint64 +} + +// FrameKind returns [FrameKindDiscard]. +func (d DiscardFrame) FrameKind() FrameKind { return FrameKindDiscard } + +// Marker implementations. + +func (m MessageFrame) isFrame() {} +func (a AckFrame) isFrame() {} +func (n NackFrame) isFrame() {} +func (d DiscardFrame) isFrame() {} diff --git a/pkg/engine/internal/scheduler/wire/message.go b/pkg/engine/internal/scheduler/wire/message.go new file mode 100644 index 0000000000000..6128adb94fd50 --- /dev/null +++ b/pkg/engine/internal/scheduler/wire/message.go @@ -0,0 +1,185 @@ +package wire + +import ( + "fmt" + "net" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/oklog/ulid/v2" + + "github.com/grafana/loki/v3/pkg/engine/internal/workflow" +) + +// MessageKind represents the type of a message. +type MessageKind int + +const ( + // MessageKindInvalid represents an invalid message. + MessageKindInvalid MessageKind = iota + + MessageKindWorkerReady // MessageKindWorkerReady represents [WorkerReadyMessage]. + MessageKindTaskAssign // MessageKindTaskAssign represents [TaskAssignMessage]. + MessageKindTaskCancel // MessageKindTaskCancel represents [TaskCancelMessage]. + MessageKindTaskFlag // MessageKindTaskFlag represents [TaskFlagMessage]. + MessageKindTaskStatus // MessageKindTaskStatus represents [TaskStatusMessage]. + MessageKindStreamBind // MessageKindStreamBind represents [StreamBindMessage]. + MessageKindStreamData // MessageKindStreamData represents [StreamDataMessage]. + MessageKindStreamStatus // MessageKindStreamStatus represents [StreamStatusMessage]. +) + +var kindNames = [...]string{ + MessageKindInvalid: "Invalid", + MessageKindWorkerReady: "WorkerReady", + MessageKindTaskAssign: "TaskAssign", + MessageKindTaskCancel: "TaskCancel", + MessageKindTaskFlag: "TaskFlag", + MessageKindTaskStatus: "TaskStatus", + MessageKindStreamBind: "StreamBind", + MessageKindStreamData: "StreamData", + MessageKindStreamStatus: "StreamStatus", +} + +// String returns a string representation of k. +func (k MessageKind) String() string { + if k < MessageKindInvalid || int(k) >= len(kindNames) { + return fmt.Sprintf("Kind(%d)", k) + } + + name := kindNames[k] + if name == "" { + return fmt.Sprintf("Kind(%d)", k) + } + return name +} + +// A Message is a message exchanged between peers. +type Message interface { + isMessage() + + // Kind returns the kind of message. + Kind() MessageKind +} + +// Messages about workers. +type ( + // WorkerReadyMessage is sent by a worker to the scheduler to request a new + // task to run. Ready workers are eventually assigned a task via + // [TaskAssignMessage]. + // + // Workers may send multiple WorkerReadyMessage messages to request more + // tasks. Workers are automatically unmarked as ready once each + // [WorkerReadyMessage] has been responded to with a [TaskAssignMessage]. + WorkerReadyMessage struct { + // No fields. + } +) + +// Messages about tasks. +type ( + // TaskAssignMessage is sent by the scheduler to a worker when there is a + // task to run. TaskAssignMessage is only sent to workers for which there is + // still at least one [WorkerReadyMessage]. + TaskAssignMessage struct { + Task *workflow.Task // Task to run. + + // StreamStates holds the most recent state of each stream that the task + // reads from. + // + // StreamStates does not have any entries for streams that the task + // writes to. + StreamStates map[ulid.ULID]workflow.StreamState + } + + // TaskCancelMessage is sent by the scheduler to a worker when a task is no + // longer needed, and running that task should be aborted. + TaskCancelMessage struct { + ID ulid.ULID // ID of the Task to cancel. + } + + // TaskFlagMessage is sent by the scheduler to update the runtime flags of a task. + TaskFlagMessage struct { + ID ulid.ULID // ID of the Task to update. + + // Interruptible indicates that tasks blocked on writing or reading to a + // [Stream] can be paused, and that worker can accept new tasks to run. + // Tasks are not interruptible by default. + Interruptible bool + } + + // TaskStatusMessage is sent by the worker to the scheduler to inform the + // scheduler of the current status of a task. + TaskStatusMessage struct { + ID ulid.ULID // ID of the Task to update. + Status workflow.TaskStatus // Current status of the task. + } +) + +// Messages about streams. +type ( + // StreamBindMessage is sent by the scheduler to a worker to inform the + // worker about the location of a stream receiver. + StreamBindMessage struct { + StreamID ulid.ULID // ID of the stream. + Receiver net.Addr // Address of the stream receiver. + } + + // StreamDataMessage is sent by a worker to a stream receiver to provide + // payload data for a stream. + StreamDataMessage struct { + StreamID ulid.ULID // ID of the stream. + Data arrow.Record // Payload data for the stream. + } + + // StreamStatusMessage communicates the status of the sending side of a + // stream. It is sent in two cases: + // + // - By the sender of the stream, to inform the scheduler about the status + // of that stream. + // + // - By the scheduler, to inform the stream reader about the status of the + // stream. + // + // The scheduler is responsible for informing stream receivers about stream + // status to avoid keeping streams alive if the sender disconnects. + StreamStatusMessage struct { + StreamID ulid.ULID // ID of the stream. + State workflow.StreamState // State of the stream. + } +) + +// Marker implementations + +func (WorkerReadyMessage) isMessage() {} +func (TaskAssignMessage) isMessage() {} +func (TaskCancelMessage) isMessage() {} +func (TaskFlagMessage) isMessage() {} +func (TaskStatusMessage) isMessage() {} +func (StreamBindMessage) isMessage() {} +func (StreamDataMessage) isMessage() {} +func (StreamStatusMessage) isMessage() {} + +// Kinds + +// Kind returns [MessageKindWorkerReady]. +func (WorkerReadyMessage) Kind() MessageKind { return MessageKindWorkerReady } + +// Kind returns [MessageKindTaskAssign]. +func (TaskAssignMessage) Kind() MessageKind { return MessageKindTaskAssign } + +// Kind returns [MessageKindTaskCancel]. +func (TaskCancelMessage) Kind() MessageKind { return MessageKindTaskCancel } + +// Kind returns [MessageKindTaskFlag]. +func (TaskFlagMessage) Kind() MessageKind { return MessageKindTaskFlag } + +// Kind returns [MessageKindTaskStatus]. +func (TaskStatusMessage) Kind() MessageKind { return MessageKindTaskStatus } + +// Kind returns [MessageKindStreamBind]. +func (StreamBindMessage) Kind() MessageKind { return MessageKindStreamBind } + +// Kind returns [MessageKindStreamData]. +func (StreamDataMessage) Kind() MessageKind { return MessageKindStreamData } + +// Kind returns [MessageKindStreamStatus]. +func (StreamStatusMessage) Kind() MessageKind { return MessageKindStreamStatus } diff --git a/pkg/engine/internal/scheduler/wire/peer.go b/pkg/engine/internal/scheduler/wire/peer.go new file mode 100644 index 0000000000000..d1a7485754121 --- /dev/null +++ b/pkg/engine/internal/scheduler/wire/peer.go @@ -0,0 +1,254 @@ +package wire + +import ( + "context" + "errors" + "fmt" + "net" + "reflect" + "sync" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" +) + +// Peer wraps a [Conn] into a synchronous API that acts as both a +// server and a client. +// +// Callers must call [Peer.Serve] to run the peer. +type Peer struct { + Logger log.Logger + Conn Conn // Connection to use for communication. + Handler Handler // Handler for incoming messages from the remote peer. + Buffer int // Buffer size for incoming and outgoing messages. + + incoming chan MessageFrame // Buffered frame of incoming messages. + outgoing chan Frame // Buffered frame of outgoing frames. + initOnce sync.Once + + requestID atomic.Uint64 + sentRequests sync.Map // map[uint64]*request +} + +// Handler is a function that handles a message received from the peer. The +// local peer is passed as an argument to allow using the same Handler for +// multiple Peers. +// +// Handlers are invoked in a dedicated goroutine. Slow handlers cause +// backpressure on the connection. +// +// Once Handler returns, the sending peer will be informed about the +// message delivery status. If Handler returns an error, the error +// message will be sent to the peer. +type Handler func(ctx context.Context, peer *Peer, message Message) error + +// Serve runs the peer, blocking until the provided context is canceled. +func (p *Peer) Serve(ctx context.Context) error { + p.lazyInit() + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { return p.recvMessages(ctx) }) + g.Go(func() error { return p.handleIncoming(ctx) }) + g.Go(func() error { return p.handleOutgoing(ctx) }) + + return g.Wait() +} + +func (p *Peer) lazyInit() { + if p.outgoing != nil { + return + } + + p.initOnce.Do(func() { + p.incoming = make(chan MessageFrame, p.Buffer) + p.outgoing = make(chan Frame, p.Buffer) + }) +} + +func (p *Peer) recvMessages(ctx context.Context) error { + for { + frame, err := p.Conn.Recv(ctx) + if err != nil && ctx.Err() != nil { + // Context got canceled; shut down + return nil + } else if err != nil { + return fmt.Errorf("recv: %w", err) + } + + switch frame := frame.(type) { + case MessageFrame: + // Queue the message for processing. + select { + case p.incoming <- frame: + case <-ctx.Done(): + return nil + } + + case AckFrame: + // If there's still a listener for this request, inform them of + // the success. + val, found := p.sentRequests.Load(frame.ID) + if !found { + continue + } + req := val.(*request) + + select { + case req.result <- nil: + default: + level.Warn(p.Logger).Log("msg", "ignoring duplicate acknowledgement") + } + + case NackFrame: + // If there's still a listener for this request, inform them of + // the error. + val, found := p.sentRequests.Load(frame.ID) + if !found { + continue + } + req := val.(*request) + + select { + case req.result <- frame.Error: + default: + level.Warn(p.Logger).Log("msg", "ignoring duplicate acknowledgement") + } + + case DiscardFrame: + // TODO(rfratto): cancel handleMessage goroutine + + default: + level.Warn(p.Logger).Log("msg", "unknown frame type", "type", reflect.TypeOf(frame).String()) + } + } +} + +func (p *Peer) handleIncoming(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case frame := <-p.incoming: + p.processMessage(ctx, frame.ID, frame.Message) + } + } +} + +func (p *Peer) handleOutgoing(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case frame := <-p.outgoing: + if err := p.Conn.Send(ctx, frame); err != nil && ctx.Err() == nil { + level.Warn(p.Logger).Log("msg", "failed to send message", "error", err) + p.notifyError(frame, err) + } + } + } +} + +// notifyError notifies any request listeners of a frame that an error occurred +// during delivery. +func (p *Peer) notifyError(frame Frame, err error) { + switch frame := frame.(type) { + case MessageFrame: + val, found := p.sentRequests.Load(frame.ID) + if !found { + return + } + req := val.(*request) + + select { + case req.result <- err: + default: + level.Warn(p.Logger).Log("msg", "ignoring duplicate acknowledgement") + } + + default: + // Other frame types don't have listeners (at the moment) so there's nobody + // to notify. + } +} + +// processMessage handles a message received from the peer. +func (p *Peer) processMessage(ctx context.Context, id uint64, message Message) { + if p.Handler == nil { + _ = p.enqueueFrame(ctx, NackFrame{ID: id, Error: errors.New("not implemented")}) + return + } + + switch err := p.Handler(ctx, p, message); err { + case nil: + // TODO(rfratto): What should we do if this fails? Logs? Metrics? + _ = p.enqueueFrame(ctx, AckFrame{ID: id}) + default: + // TODO(rfratto): What should we do if this fails? Logs? Metrics? + _ = p.enqueueFrame(ctx, NackFrame{ID: id, Error: err}) + } +} + +type request struct { + result chan error +} + +// SendMessage sends a message to the remote peer. SendMessage blocks until the +// provided context is canceled or the remote peer positively or negatively +// acknowledges the message. +// +// [Peer.Serve] must be running when SendMessage is called, otherwise it blocks +// until the context is canceled. +func (p *Peer) SendMessage(ctx context.Context, message Message) error { + p.lazyInit() + + reqID := p.requestID.Inc() + req := &request{ + result: make(chan error, 1), + } + p.sentRequests.Store(reqID, req) + defer p.sentRequests.Delete(reqID) + + if err := p.enqueueFrame(ctx, MessageFrame{ID: reqID, Message: message}); err != nil { + return err + } + + select { + case <-ctx.Done(): + // TODO(rfratto): queue a DiscardFrame + return ctx.Err() + case err := <-req.result: + return err + } +} + +// SendMessageAsync sends a message to the remote peer asynchronously. +// SendMessageAsync blocks until the message has been sent over the connection +// but does not wait for an acknowledgement or response. +// +// [Peer.Serve] must be running before SendMessageAsync is called, otherwise it +// blocks until the context is canceled. +func (p *Peer) SendMessageAsync(ctx context.Context, message Message) error { + p.lazyInit() + + reqID := p.requestID.Inc() + return p.enqueueFrame(ctx, MessageFrame{ID: reqID, Message: message}) +} + +// enqueueFrame enqueues a frame to be sent to the remote peer. +func (p *Peer) enqueueFrame(ctx context.Context, frame Frame) error { + select { + case <-ctx.Done(): + return ctx.Err() + case p.outgoing <- frame: + return nil + } +} + +// LocalAddr returns the address of the local peer. +func (p *Peer) LocalAddr() net.Addr { return p.Conn.LocalAddr() } + +// RemoteAddr returns the address of the remote peer. +func (p *Peer) RemoteAddr() net.Addr { return p.Conn.RemoteAddr() } diff --git a/pkg/engine/internal/scheduler/wire/wire.go b/pkg/engine/internal/scheduler/wire/wire.go new file mode 100644 index 0000000000000..7e2dbf80b3bd2 --- /dev/null +++ b/pkg/engine/internal/scheduler/wire/wire.go @@ -0,0 +1,57 @@ +// Package wire provides the wire protocol for how peers scheduler peers +// communicate. +package wire + +import ( + "context" + "errors" + "net" +) + +// ErrConnClosed indicates a closed connection between peers. +var ErrConnClosed = errors.New("connection closed") + +// Listener waits for incoming connections from scheduler peers. +type Listener interface { + // Accept waits for and returns the next connection to the listener. Accept + // returns an error if the context is canceled or if the listener is closed. + Accept(ctx context.Context) (Conn, error) + + // Close closes the listener. Any blocked Accept operations will be + // unblocked and return errors. + Close(ctx context.Context) error + + // Addr returns the listener's advertised network address. Peers use this + // address to connect to the listener. + Addr() net.Addr +} + +// Conn is a communication stream between two peers. +type Conn interface { + // Send sends the provided Frame to the peer. Send blocks until the Frame + // has been sent to the peer, but does not wait for the peer to acknowledge + // receipt of the Frame. + // + // Send returns an error if the context is canceled or if the connection is + // closed. + Send(context.Context, Frame) error + + // Recv receives the next Frame from the peer. Recv blocks until a Frame is + // available. Recv returns an error if the context is canceled or if the + // connection is closed. + // + // Callers should take care to avoid long periods of where there is not an + // active call to Recv to avoid blocking the peer's Send call. + Recv(context.Context) (Frame, error) + + // Close closes the Conn. Close may be called by either side of the + // connection. After the connection has been closed, calls to Send or Recv + // return [ErrConnClosed]. + Close() error + + // LocalAddr returns the address of the local side of the connection. + LocalAddr() net.Addr + + // RemoteAddr returns the address of the remote side of the connection. + RemoteAddr() net.Addr +} diff --git a/pkg/engine/internal/scheduler/wire/wire_local.go b/pkg/engine/internal/scheduler/wire/wire_local.go new file mode 100644 index 0000000000000..c01ba67136024 --- /dev/null +++ b/pkg/engine/internal/scheduler/wire/wire_local.go @@ -0,0 +1,174 @@ +package wire + +import ( + "context" + "net" + "sync" +) + +var ( + // LocalScheduler is the address of the local scheduler when using the + // [Local] listener. + LocalScheduler net.Addr = localAddr("scheduler") + + // LocalWorker is the address of the local worker when using the + // [Local] listener. + LocalWorker net.Addr = localAddr("worker") +) + +type localAddr string + +var _ net.Addr = localAddr("") + +func (addr localAddr) Network() string { return "local" } +func (addr localAddr) String() string { return string(addr) } + +// Local is a [Listener] that accepts connections from the local process without +// using the network. +type Local struct { + // Address to broadcast when connecting to peers. Should be [LocalScheduler] + // for the scheduler and [LocalWorker] for the worker. + Address net.Addr + + incoming chan *localConn // Incoming connections to this Local + alive context.Context + close context.CancelFunc + + initOnce sync.Once + closeOnce sync.Once +} + +var _ Listener = (*Local)(nil) + +// DialFrom dials the local listener, blocking until the connection is accepted +// or the context is canceled. Returns the caller's connection. +func (l *Local) DialFrom(ctx context.Context, from net.Addr) (Conn, error) { + l.lazyInit() + + localToRemote := make(chan Frame) + remoteToLocal := make(chan Frame) + + // Generate a context to share between the streams for whether the + // connection is still open. + // + // This is attached to the listener's context which closes all connections + // when the listener is closed. + alive, closeConn := context.WithCancel(l.alive) + + localStream := &localConn{ + alive: alive, + close: closeConn, + + localAddr: from, + remoteAddr: l.Address, + + write: localToRemote, + read: remoteToLocal, + } + + remoteStream := &localConn{ + alive: alive, + close: closeConn, + + localAddr: l.Address, + remoteAddr: from, + + write: remoteToLocal, + read: localToRemote, + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-alive.Done(): + return nil, net.ErrClosed + case l.incoming <- remoteStream: + return localStream, nil + } +} + +func (l *Local) lazyInit() { + l.initOnce.Do(func() { + l.incoming = make(chan *localConn) + l.alive, l.close = context.WithCancel(context.Background()) + }) +} + +// Accept waits for and returns the next connection to the listener. +func (l *Local) Accept(ctx context.Context) (Conn, error) { + l.lazyInit() + + select { + case <-l.alive.Done(): + return nil, net.ErrClosed + case <-ctx.Done(): + return nil, ctx.Err() + case conn := <-l.incoming: + return conn, nil + } +} + +// Close closes the listener. Any blocked Accept operations will be unblocked and return errors. +func (l *Local) Close(_ context.Context) error { + l.lazyInit() + + l.closeOnce.Do(func() { + close(l.incoming) + l.close() + }) + + return nil +} + +// Addr returns the listener's advertised address. +func (l *Local) Addr() net.Addr { return l.Address } + +type localConn struct { + alive context.Context + close context.CancelFunc + + localAddr, remoteAddr net.Addr + + write chan<- Frame + read <-chan Frame +} + +var _ Conn = (*localConn)(nil) + +func (c *localConn) Send(ctx context.Context, frame Frame) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-c.alive.Done(): // Conn closed + return ErrConnClosed + case c.write <- frame: + return nil + } +} + +func (c *localConn) Recv(ctx context.Context) (Frame, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.alive.Done(): // Conn closed + return nil, ErrConnClosed + case frame, ok := <-c.read: + if !ok { + // Close our end, in case it's not already closed. + _ = c.Close() + return nil, ErrConnClosed + } + return frame, nil + } +} + +func (c *localConn) Close() error { + c.close() + return nil +} + +// LocalAddr returns the address of the local side of the connection. +func (c *localConn) LocalAddr() net.Addr { return c.localAddr } + +// RemoteAddr returns the address of the remote side of the connection. +func (c *localConn) RemoteAddr() net.Addr { return c.remoteAddr }