Skip to content

Commit

Permalink
Log total time that the query spent in the queue (#6537)
Browse files Browse the repository at this point in the history
* Log how much time did query spend in the queue (in frontend or query-scheduler).

Signed-off-by: Peter Štibraný <[email protected]>

* Add unit tests.

Signed-off-by: Peter Štibraný <[email protected]>

* CHANGELOG.md entry

Signed-off-by: Peter Štibraný <[email protected]>

* Fix race in the test.

Signed-off-by: Peter Štibraný <[email protected]>

* Fix bugs found by review.

Signed-off-by: Peter Štibraný <[email protected]>

---------

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Nov 3, 2023
1 parent ef26ab6 commit e238310
Show file tree
Hide file tree
Showing 19 changed files with 501 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* [ENHANCEMENT] Add connection-string option, `-<prefix>.azure.connection-string`, for Azure Blob Storage. #6487
* [ENHANCEMENT] Ingester: Add `-ingester.instance-limits.max-inflight-push-requests-bytes`. This limit protects the ingester against requests that together may cause an OOM. #6492
* [ENHANCEMENT] Ingester: add new per-tenant `cortex_ingester_local_limits` metric to expose the calculated local per-tenant limits seen at each ingester. Exports the local per-tenant series limit with label `{limit="max_global_series_per_user"}` #6403
* [ENHANCEMENT] Query-frontend: added "queue_time_seconds" field to "query stats" log. This is total time that query and subqueries spent in the queue, before queriers picked it up. #6537
* [BUGFIX] Ring: Ensure network addresses used for component hash rings are formatted correctly when using IPv6. #6068
* [BUGFIX] Query-scheduler: don't retain connections from queriers that have shut down, leading to gradually increasing enqueue latency over time. #6100 #6145
* [BUGFIX] Ingester: prevent query logic from continuing to execute after queries are canceled. #6085
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"sharded_queries", stats.LoadShardedQueries(),
"split_queries", stats.LoadSplitQueries(),
"estimated_series_count", stats.GetEstimatedSeriesCount(),
"queue_time_seconds", stats.LoadQueueTime().Seconds(),
}, formatQueryString(queryString)...)

if len(f.cfg.LogQueryRequestHeaders) != 0 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.Len(t, logger.logMessages, 1)

msg := logger.logMessages[0]
require.Len(t, msg, 18+len(tt.expectedParams))
require.Len(t, msg, 19+len(tt.expectedParams))
require.Equal(t, level.InfoValue(), msg["level"])
require.Equal(t, "query stats", msg["msg"])
require.Equal(t, "query-frontend", msg["component"])
Expand All @@ -197,6 +197,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.EqualValues(t, 0, msg["sharded_queries"])
require.EqualValues(t, 0, msg["split_queries"])
require.EqualValues(t, 0, msg["estimated_series_count"])
require.EqualValues(t, 0, msg["queue_time_seconds"])

for name, values := range tt.expectedParams {
logMessageKey := fmt.Sprintf("param_%v", name)
Expand Down
10 changes: 6 additions & 4 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {

req := reqWrapper.(*request)

f.queueDuration.Observe(time.Since(req.enqueueTime).Seconds())
queueTime := time.Since(req.enqueueTime)
f.queueDuration.Observe(queueTime.Seconds())
req.queueSpan.Finish()

/*
Expand All @@ -245,9 +246,10 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
errs := make(chan error, 1)
go func() {
err = server.Send(&frontendv1pb.FrontendToClient{
Type: frontendv1pb.HTTP_REQUEST,
HttpRequest: req.request,
StatsEnabled: stats.IsEnabled(req.originalCtx),
Type: frontendv1pb.HTTP_REQUEST,
HttpRequest: req.request,
StatsEnabled: stats.IsEnabled(req.originalCtx),
QueueTimeNanos: queueTime.Nanoseconds(),
})
if err != nil {
errs <- err
Expand Down
73 changes: 72 additions & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand All @@ -36,6 +37,7 @@ import (

"github.com/grafana/mimir/pkg/frontend/transport"
"github.com/grafana/mimir/pkg/frontend/v1/frontendv1pb"
"github.com/grafana/mimir/pkg/querier/stats"
querier_worker "github.com/grafana/mimir/pkg/querier/worker"
)

Expand Down Expand Up @@ -227,6 +229,75 @@ func TestFrontendMetricsCleanup(t *testing.T) {
testFrontend(t, defaultFrontendConfig(), handler, test, nil, reg)
}

func TestFrontendStats(t *testing.T) {
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s := stats.FromContext(r.Context())
s.AddQueueTime(5 * time.Second)
w.WriteHeader(200)
})

tl := testLogger{}

test := func(addr string, fr *Frontend) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/", addr), nil)
require.NoError(t, err)
err = user.InjectOrgIDIntoHTTPRequest(user.InjectOrgID(context.Background(), "1"), req)
require.NoError(t, err)

resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
defer resp.Body.Close()
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)

queryStatsFound := false
for _, le := range tl.getLogMessages() {
if le["msg"] == "query stats" {
require.False(t, queryStatsFound)
queryStatsFound = true
require.GreaterOrEqual(t, le["queue_time_seconds"], 5.0)
}
}
require.True(t, queryStatsFound)
}

testFrontend(t, defaultFrontendConfig(), handler, test, &tl, nil)
}

type testLogger struct {
mu sync.Mutex
logMessages []map[string]interface{}
}

func (t *testLogger) Log(keyvals ...interface{}) error {
if len(keyvals)%2 != 0 {
panic("received uneven number of key/value pairs for log line")
}

entryCount := len(keyvals) / 2
msg := make(map[string]interface{}, entryCount)

for i := 0; i < entryCount; i++ {
name := keyvals[2*i].(string)
value := keyvals[2*i+1]

msg[name] = value
}

t.mu.Lock()
defer t.mu.Unlock()

t.logMessages = append(t.logMessages, msg)
return nil
}

func (t *testLogger) getLogMessages() []map[string]interface{} {
t.mu.Lock()
defer t.mu.Unlock()
return append([]map[string]interface{}(nil), t.logMessages...)
}

func testFrontend(t *testing.T, config Config, handler http.Handler, test func(addr string, frontend *Frontend), l log.Logger, reg prometheus.Registerer) {
logger := log.NewNopLogger()
if l != nil {
Expand Down Expand Up @@ -261,7 +332,7 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
frontendv1pb.RegisterFrontendServer(grpcServer, v1)

// Default HTTP handler config.
handlerCfg := transport.HandlerConfig{}
handlerCfg := transport.HandlerConfig{QueryStatsEnabled: true}
flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
Expand Down
105 changes: 74 additions & 31 deletions pkg/frontend/v1/frontendv1pb/frontend.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/frontend/v1/frontendv1pb/frontend.proto
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ message FrontendToClient {
// Whether query statistics tracking should be enabled. The response will include
// statistics only when this option is enabled.
bool statsEnabled = 3;

// How much time did query spend in the queue.
int64 queueTimeNanos = 4;
}

message ClientToFrontend {
Expand Down
17 changes: 17 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,22 @@ func (s *Stats) LoadEstimatedSeriesCount() uint64 {
return atomic.LoadUint64(&s.EstimatedSeriesCount)
}

func (s *Stats) AddQueueTime(t time.Duration) {
if s == nil {
return
}

atomic.AddInt64((*int64)(&s.QueueTime), int64(t))
}

func (s *Stats) LoadQueueTime() time.Duration {
if s == nil {
return 0
}

return time.Duration(atomic.LoadInt64((*int64)(&s.QueueTime)))
}

// Merge the provided Stats into this one.
func (s *Stats) Merge(other *Stats) {
if s == nil || other == nil {
Expand All @@ -185,6 +201,7 @@ func (s *Stats) Merge(other *Stats) {
s.AddSplitQueries(other.LoadSplitQueries())
s.AddFetchedIndexBytes(other.LoadFetchedIndexBytes())
s.AddEstimatedSeriesCount(other.LoadEstimatedSeriesCount())
s.AddQueueTime(other.LoadQueueTime())
}

func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool {
Expand Down
Loading

0 comments on commit e238310

Please sign in to comment.