forked from influxdata/flux
-
Notifications
You must be signed in to change notification settings - Fork 1
/
query.go
173 lines (147 loc) · 5.9 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package flux
import (
"time"
"github.com/influxdata/flux/metadata"
)
// Query represents an active query.
type Query interface {
// Results returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Results() <-chan Result
// Done must always be called to free resources. It is safe to call Done
// multiple times.
Done()
// Cancel will signal that query execution should stop.
// Done must still be called to free resources.
// It is safe to call Cancel multiple times.
Cancel()
// Err reports any error the query may have encountered.
Err() error
// Statistics reports the statistics for the query.
// The statistics are not complete until Done is called.
Statistics() Statistics
// ProfilerResults returns profiling results for the query
ProfilerResults() (ResultIterator, error)
}
// Statistics is a collection of statistics about the processing of a query.
type Statistics struct {
// TotalDuration is the total amount of time in nanoseconds spent.
TotalDuration time.Duration `json:"total_duration"`
// CompileDuration is the amount of time in nanoseconds spent compiling the query.
CompileDuration time.Duration `json:"compile_duration"`
// QueueDuration is the amount of time in nanoseconds spent queueing.
QueueDuration time.Duration `json:"queue_duration"`
// PlanDuration is the amount of time in nanoseconds spent in plannig the query.
PlanDuration time.Duration `json:"plan_duration"`
// RequeueDuration is the amount of time in nanoseconds spent requeueing.
RequeueDuration time.Duration `json:"requeue_duration"`
// ExecuteDuration is the amount of time in nanoseconds spent in executing the query.
ExecuteDuration time.Duration `json:"execute_duration"`
// Concurrency is the number of goroutines allocated to process the query
Concurrency int `json:"concurrency"`
// MaxAllocated is the maximum number of bytes the query allocated.
MaxAllocated int64 `json:"max_allocated"`
// TotalAllocated is the total number of bytes allocated.
// The number includes memory that was freed and then used again.
TotalAllocated int64 `json:"total_allocated"`
// Profiles holds the profiles for each transport (source/transformation) in this query.
Profiles []TransportProfile `json:"profiles"`
// RuntimeErrors contains error messages that happened during the execution of the query.
RuntimeErrors []string `json:"runtime_errors"`
// Metadata contains metadata key/value pairs that have been attached during execution.
Metadata metadata.Metadata `json:"metadata"`
}
// Add returns the sum of s and other.
func (s Statistics) Add(other Statistics) Statistics {
errs := make([]string, len(s.RuntimeErrors), len(s.RuntimeErrors)+len(other.RuntimeErrors))
copy(errs, s.RuntimeErrors)
errs = append(errs, other.RuntimeErrors...)
md := make(metadata.Metadata)
md.AddAll(s.Metadata)
md.AddAll(other.Metadata)
profiles := make([]TransportProfile, 0, len(s.Profiles)+len(other.Profiles))
profiles = append(profiles, s.Profiles...)
profiles = append(profiles, other.Profiles...)
return Statistics{
TotalDuration: s.TotalDuration + other.TotalDuration,
CompileDuration: s.CompileDuration + other.CompileDuration,
QueueDuration: s.QueueDuration + other.QueueDuration,
PlanDuration: s.PlanDuration + other.PlanDuration,
RequeueDuration: s.RequeueDuration + other.RequeueDuration,
ExecuteDuration: s.ExecuteDuration + other.ExecuteDuration,
Concurrency: s.Concurrency + other.Concurrency,
MaxAllocated: s.MaxAllocated + other.MaxAllocated,
TotalAllocated: s.TotalAllocated + other.TotalAllocated,
Profiles: profiles,
RuntimeErrors: errs,
Metadata: md,
}
}
// Merge copies the values from other into s.
func (s *Statistics) Merge(other Statistics) {
s.TotalDuration += other.TotalDuration
s.CompileDuration += other.CompileDuration
s.QueueDuration += other.QueueDuration
s.PlanDuration += other.PlanDuration
s.RequeueDuration += other.RequeueDuration
s.ExecuteDuration += other.ExecuteDuration
s.Concurrency += other.Concurrency
s.MaxAllocated += other.MaxAllocated
s.TotalAllocated += other.TotalAllocated
s.Profiles = append(s.Profiles, other.Profiles...)
s.RuntimeErrors = append(s.RuntimeErrors, other.RuntimeErrors...)
s.Metadata.AddAll(other.Metadata)
}
// TransportProfile holds the profile for transport statistics.
type TransportProfile struct {
// NodeType holds the node type which is a string representation
// of the underlying transformation.
NodeType string `json:"node_type"`
// Label holds the plan node label.
Label string `json:"label"`
// Count holds the number of spans in this profile.
Count int64 `json:"count"`
// Min holds the minimum span time of this profile.
Min int64 `json:"min"`
// Max holds the maximum span time of this profile.
Max int64 `json:"max"`
// Sum holds the sum of all span times for this profile.
Sum int64 `json:"sum"`
// Mean is the mean span time of this profile.
Mean float64 `json:"mean"`
}
// StartSpan will start a profile span to be recorded.
func (p *TransportProfile) StartSpan(now ...time.Time) TransportProfileSpan {
var start time.Time
if len(now) > 0 {
start = now[0]
} else {
start = time.Now()
}
return TransportProfileSpan{
p: p,
start: start,
}
}
// TransportProfileSpan is a span that tracks the lifetime of a transport operation.
type TransportProfileSpan struct {
p *TransportProfile
start time.Time
}
// Finish finishes the span and records the metrics for that operation.
func (span *TransportProfileSpan) Finish() {
span.FinishWithTime(time.Now())
}
func (span *TransportProfileSpan) FinishWithTime(now time.Time) {
d := now.Sub(span.start).Nanoseconds()
if d < span.p.Min || span.p.Count == 0 {
span.p.Min = d
}
if d > span.p.Max {
span.p.Max = d
}
span.p.Count++
span.p.Sum += d
span.p.Mean = float64(span.p.Sum) / float64(span.p.Count)
}