-
Notifications
You must be signed in to change notification settings - Fork 3.8k
chore(engine): introduce execution capture #19821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| // in the returned record are ordered by timestamp in the direction specified | ||
| // by opts.Direction. | ||
| func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger) *dataobjScan { | ||
| func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger, region *xcap.Region) *dataobjScan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
region is created once per node. The node owns the region, it has to end the region on Close()
alternatively we could also create it on the first Read() call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably be pulling the current region out of the context so we don't have to pass it around everywhere, maybe an xcap.CurrentRegion(ctx)? (If there isn't a current region, it can make one that does nothing by checking to see if the capture is nil or not)
| └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) | ||
| ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() | ||
| └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() | ||
| ├── DataObjScan location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
targets are now children, not comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried about that; they're not children nodes and it might give someone an incorrect understanding of the plan DAG.
I saw your comment above about moving them to children since we're using comments for other things, but I feel like comments can still make sense here, especially since they're called out explicitly as @target. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted target to be a comment. Update the printer to allow nested comments.
What it looks like now:
└── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, ambiguous.request_duration, builtin.timestamp) predic
├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=()
│ └── @max_time_range start=2025-01-01T00:30:00Z end=2025-01-01T01:00:00Z
└── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=()
└── @max_time_range start=2025-01-01T00:00:00Z end=2025-01-01T00:30:00Z
rfratto
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
starting out with a round of comments for things we don't need to address right away, but stood out to me while looking through the code
pkg/xcap/scope.go
Outdated
| // NewScope creates a new Scope with the given data. | ||
| // This function is mainly used for unmarshaling from protobuf. | ||
| func NewScope(name string, startTime, endTime time.Time, observations map[StatisticKey]AggregatedObservation, ended bool, capture *Capture) *Scope { | ||
| return &Scope{ | ||
| capture: capture, | ||
| name: name, | ||
| attributes: nil, // Attributes not available during unmarshaling | ||
| startTime: startTime, | ||
| endTime: endTime, | ||
| observations: observations, | ||
| ended: ended, | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the future: feels weird to expose this just for protobuf, maybe we'll eventually want to have the protobuf conversion as internal logic in the xcap package 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i didn't like adding this either, i'll make that change in a follow-up
pkg/xcap/scope.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
future PR: rename to Region to align with what we discussed
pkg/xcap/scope.go
Outdated
| } | ||
|
|
||
| // Create OpenTelemetry span for this scope. | ||
| ctx, span := xcapTracer.Start(ctx, name, trace.WithAttributes(cfg.attributes...)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the future: eventually this will need to be moved to an export step; as it is now any span we create will get considered for exporting even if we want to bubble regions up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah 👍 I left this in intentionally so we have connected spans for a task.
| package xcap | ||
|
|
||
| // AggregatedObservation holds an aggregated value for a statistic within a scope. | ||
| type AggregatedObservation struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the future: I don't think AggregatedObservation should be exposed as part of the xcap API; this feels more like a detail of how we bubble up scopes.
| Name() string | ||
| DataType() DataType | ||
| Aggregation() AggregationType | ||
| Key() StatisticKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the future: I think we should unexport all methods except for Name here; that would also allow us to unexport DataType, AggregationType, and StatisticKey since we probably don't need xcap users to know about those.
(This will be more possible once we do the other change to move protobuf conversion as unexported logic in xcap)
rfratto
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice job! i'm really excited to have this, especially for some of the stats I wanted to make that would have non-sum aggregations (like for start/end times)
this round of comments is stuff I'd like to discuss or see addressed before we merge.
| // ScopeProvider is an optional interface that pipelines can implement | ||
| // to expose their associated xcap scope for statistics collection. | ||
| type ScopeProvider interface { | ||
| // Scope returns the xcap scope associated with this pipeline node, if any. | ||
| // Returns nil if no scope is associated with this pipeline. | ||
| Scope() *xcap.Scope | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having a hard time telling where the result of Scope() gets called for any reason other than implementing Scope().
Is it possible to handle this with context.Context instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is being used by observedPipeline to capture common stats for all nodes
loki/pkg/engine/internal/executor/pipeline.go
Lines 310 to 339 in e93cdbb
| func newObservedPipeline(inner Pipeline) *observedPipeline { | |
| p := &observedPipeline{ | |
| inner: inner, | |
| } | |
| if provider, ok := inner.(ScopeProvider); ok { | |
| p.scope = provider.Scope() | |
| } | |
| return p | |
| } | |
| // Read implements Pipeline. | |
| func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) { | |
| start := time.Now() | |
| if p.scope != nil { | |
| p.scope.Record(statReadCalls.Observe(1)) | |
| } | |
| rec, err := p.inner.Read(ctx) | |
| if p.scope != nil { | |
| if rec != nil { | |
| p.scope.Record(statRowsOut.Observe(rec.NumRows())) | |
| } | |
| p.scope.Record(statReadDuration.Observe(time.Since(start).Nanoseconds())) | |
| } | |
an alternate could be to store the scope of child node in observedPipeline. On every read call it attaches the scope to the context before passing it to the child node
| └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) | ||
| ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() | ||
| └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() | ||
| ├── DataObjScan location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worried about that; they're not children nodes and it might give someone an incorrect understanding of the plan DAG.
I saw your comment above about moving them to children since we're using comments for other things, but I feel like comments can still make sense here, especially since they're called out explicitly as @target. WDYT?
- create capture at start of Exec - Add comment about NodeID traceability
What this PR does / why we need it:
Introduces
xcappkg to capture statistics from query execution.Captures can be serialised which allows merging captures from all the tasks and summarising it in the scheduler.
Following is a physical plan enriched with observations from the capture
Which issue(s) this PR fixes:
Fixes #
Special notes for your reviewer:
Checklist
CONTRIBUTING.mdguide (required)featPRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.mddeprecated-config.yamlanddeleted-config.yamlfiles respectively in thetools/deprecated-config-checkerdirectory. Example PR