Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 85 additions & 85 deletions pkg/engine/internal/proto/wirepb/wirepb.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/engine/internal/proto/wirepb/wirepb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import "github.com/grafana/dskit/httpgrpc/httpgrpc.proto";
import "gogoproto/gogo.proto";
import "pkg/engine/internal/proto/physicalpb/physicalpb.proto";
import "pkg/engine/internal/proto/ulid/ulid.proto";
import "pkg/xcap/proto/xcap.proto";
import "pkg/xcap/xcap.proto";

option go_package = "github.com/grafana/loki/v3/pkg/engine/internal/proto/wirepb";

Expand Down Expand Up @@ -177,7 +177,7 @@ message TaskStatus {
TaskError error = 2;

// Capture is the capture data for this task, if available.
loki.xcap.Capture capture = 3;
loki.xcap.ProtoCapture capture = 3;
}

message TaskError {
Expand Down
6 changes: 3 additions & 3 deletions pkg/engine/internal/scheduler/wire/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
protoUlid "github.com/grafana/loki/v3/pkg/engine/internal/proto/ulid"
"github.com/grafana/loki/v3/pkg/engine/internal/proto/wirepb"
"github.com/grafana/loki/v3/pkg/engine/internal/workflow"
xcapProto "github.com/grafana/loki/v3/pkg/xcap/proto"
"github.com/grafana/loki/v3/pkg/xcap"
)

var defaultFrameCodec = &protobufCodec{
Expand Down Expand Up @@ -301,7 +301,7 @@ func (c *protobufCodec) taskStatusFromPbTaskStatus(ts *wirepb.TaskStatus) (workf
}

if pbCapture := ts.GetCapture(); pbCapture != nil {
capture, err := xcapProto.FromPbCapture(pbCapture)
capture, err := xcap.FromProtoCapture(pbCapture)
if err != nil {
return workflow.TaskStatus{}, fmt.Errorf("failed to unmarshal capture: %w", err)
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func (c *protobufCodec) taskStatusToPbTaskStatus(from workflow.TaskStatus) (*wir
}

if from.Capture != nil {
capture, err := xcapProto.ToPbCapture(from.Capture)
capture, err := xcap.ToProtoCapture(from.Capture)
if err != nil {
return nil, err
}
Expand Down
97 changes: 71 additions & 26 deletions pkg/xcap/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewCapture(ctx context.Context, attributes []attribute.KeyValue) (context.C
regions: make([]*Region, 0),
}

ctx = WithCapture(ctx, capture)
ctx = contextWithCapture(ctx, capture)
return ctx, capture
}

Expand All @@ -57,19 +57,9 @@ func (c *Capture) End() {
c.ended = true
}

// Attributes returns the attributes associated with this capture.
func (c *Capture) Attributes() []attribute.KeyValue {
c.mu.RLock()
defer c.mu.RUnlock()

attrs := make([]attribute.KeyValue, len(c.attributes))
copy(attrs, c.attributes)
return attrs
}

// AddRegion adds a region to this capture. This is called by Region
// addRegion adds a region to this capture. This is called by Region
// when it is created.
func (c *Capture) AddRegion(r *Region) {
func (c *Capture) addRegion(r *Region) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -80,19 +70,9 @@ func (c *Capture) AddRegion(r *Region) {
c.regions = append(c.regions, r)
}

// Regions returns all regions in this capture.
func (c *Capture) Regions() []*Region {
c.mu.RLock()
defer c.mu.RUnlock()

regions := make([]*Region, len(c.regions))
copy(regions, c.regions)
return regions
}

// GetAllStatistics returns statistics used across all regions
// in this capture.
func (c *Capture) GetAllStatistics() []Statistic {
func (c *Capture) getAllStatistics() []Statistic {
c.mu.RLock()
defer c.mu.RUnlock()

Expand All @@ -116,7 +96,11 @@ func (c *Capture) GetAllStatistics() []Statistic {
return result
}

// Merge appends all regions from other into this capture.
// Merge appends all regions from other into this capture and establishes
// parent-child relationships. Leaf regions (regions with no children) in other
// are linked to root regions (regions with no parent) in this capture via parentID.
// This is used to link regions from different tasks in a workflow DAG.
// If the parent capture has multiple root regions, the first one (by order) is used.
func (c *Capture) Merge(other *Capture) {
if other == nil {
return
Expand All @@ -127,5 +111,66 @@ func (c *Capture) Merge(other *Capture) {
c.mu.Lock()
defer c.mu.Unlock()

c.regions = append(c.regions, other.Regions()...)
other.mu.RLock()
otherRegions := make([]*Region, len(other.regions))
copy(otherRegions, other.regions)
other.mu.RUnlock()

if len(otherRegions) == 0 {
return
}

// Find root regions (regions with no parent) in this capture.
rootRegions := make([]*Region, 0)
for _, r := range c.regions {
r.mu.RLock()
isRoot := r.parentID.IsZero()
r.mu.RUnlock()
if isRoot {
rootRegions = append(rootRegions, r)
}
}

// Find leaf regions (regions with no children) in other capture.
// A region is a leaf if no other region has it as a parent.
parentIDs := make(map[ID]bool)
for _, r := range otherRegions {
r.mu.RLock()
parentID := r.parentID
r.mu.RUnlock()
if !parentID.IsZero() {
parentIDs[parentID] = true
}
}

leafRegions := make([]*Region, 0)
for _, r := range otherRegions {
r.mu.RLock()
regionID := r.id
r.mu.RUnlock()
if !parentIDs[regionID] {
leafRegions = append(leafRegions, r)
}
}

// Link leaf regions from other to root regions in this capture via parentID.
// If there are multiple root regions, use the first one (deterministic choice).
var parentRootID ID
if len(rootRegions) > 0 {
rootRegions[0].mu.RLock()
parentRootID = rootRegions[0].id
rootRegions[0].mu.RUnlock()
}

// Update parentID of leaf regions to point to the parent root region.
for _, leaf := range leafRegions {
if !parentRootID.IsZero() {
leaf.mu.Lock()
leaf.parentID = parentRootID
leaf.mu.Unlock()
}
}

// Append all regions from other capture.
c.regions = append(c.regions, otherRegions...)
}
30 changes: 23 additions & 7 deletions pkg/xcap/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,36 @@ import (
type ctxKeyType string

const (
xcapKey ctxKeyType = "xcap"
captureKey ctxKeyType = "capture"
regionKey ctxKeyType = "region"
)

// FromContext returns the Capture from the context, or nil if no Capture
// CaptureFromContext returns the Capture from the context, or nil if no Capture
// is present.
func FromContext(ctx context.Context) *Capture {
v, ok := ctx.Value(xcapKey).(*Capture)
func CaptureFromContext(ctx context.Context) *Capture {
v, ok := ctx.Value(captureKey).(*Capture)
if !ok {
return nil
}
return v
}

// WithCapture returns a new context with the given Capture.
func WithCapture(ctx context.Context, capture *Capture) context.Context {
return context.WithValue(ctx, xcapKey, capture)
// contextWithCapture returns a new context with the given Capture.
func contextWithCapture(ctx context.Context, capture *Capture) context.Context {
return context.WithValue(ctx, captureKey, capture)
}

// regionFromContext returns the current Region from the context, or nil if no Region
// is present.
func regionFromContext(ctx context.Context) *Region {
v, ok := ctx.Value(regionKey).(*Region)
if !ok {
return nil
}
return v
}

// contextWithRegion returns a new context with the given Region.
func contextWithRegion(ctx context.Context, region *Region) context.Context {
return context.WithValue(ctx, regionKey, region)
}
50 changes: 50 additions & 0 deletions pkg/xcap/id_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package xcap

import (
"encoding/binary"
"encoding/hex"
"math/rand/v2"
)

// identifier is a unique identifier for captures and regions.
type identifier [8]byte

// ID is an exported alias for identifier for use in other packages.
type ID = identifier

var (
zeroID identifier
)

// IsValid reports whether the ID is valid.
func (id identifier) IsValid() bool {
return id != zeroID
}

// IsZero reports whether the ID is zero (all zeros).
func (id identifier) IsZero() bool {
return id == zeroID
}

// String returns the hex string representation of the ID.
func (id identifier) String() string {
return hex.EncodeToString(id[:])
}

// newID returns a new random ID. The ID is guaranteed to be non-zero.
func newID() identifier {
var id identifier
for {
binary.NativeEndian.PutUint64(id[:], rand.Uint64())
if id.IsValid() {
break
}
}
return id
}

// NewID returns a new random ID. The ID is guaranteed to be non-zero.
// This is the exported version of newID for use in other packages.
func NewID() ID {
return newID()
}
Loading
Loading