diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 606e4bcfa29..1cb438f5fa5 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -229,7 +229,12 @@ func (q *RequestQueue) dispatcherLoop() { // // If request is successfully enqueued, successFn is called before any querier can receive the request. func (q *RequestQueue) enqueueRequestToBroker(broker *queueBroker, r requestToEnqueue) error { - err := broker.enqueueRequestBack(r) + tr := tenantRequest{ + tenantID: r.tenantID, + req: r.req, + maxQueriers: r.maxQueriers, + } + err := broker.enqueueRequestBack(&tr) if err != nil { if errors.Is(err, ErrTooManyRequests) { q.discardedRequests.WithLabelValues(string(r.tenantID)).Inc() @@ -265,7 +270,7 @@ func (q *RequestQueue) tryDispatchRequestToQuerier(broker *queueBroker, call *ne } reqForQuerier := nextRequestForQuerier{ - req: req, + req: req.req, lastUserIndex: call.lastUserIndex, err: nil, } @@ -274,17 +279,15 @@ func (q *RequestQueue) tryDispatchRequestToQuerier(broker *queueBroker, call *ne if requestSent { q.queueLength.WithLabelValues(string(tenantID)).Dec() } else { - // re-casting to same type it was enqueued as; panic would indicate a bug - reqToEnqueue := req.(requestToEnqueue) // should never error; any item previously in the queue already passed validation - err := broker.enqueueRequestFront(reqToEnqueue) - level.Error(q.log).Log( - "msg", "failed to re-enqueue query request after dequeue", - "err", err, "tenant", tenantID, "querier", call.querierID, - ) - + err := broker.enqueueRequestFront(req) + if err != nil { + level.Error(q.log).Log( + "msg", "failed to re-enqueue query request after dequeue", + "err", err, "tenant", tenantID, "querier", call.querierID, + ) + } } - return true } diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index dcdba9442d5..0057008b295 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -240,3 +240,43 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI _, _, err := queue.GetNextRequestForQuerier(context.Background(), FirstUser(), querierID) require.EqualError(t, err, "querier has informed the scheduler it is shutting down") } + +func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSendToQuerier(t *testing.T) { + const forgetDelay = 3 * time.Second + const querierID = "querier-1" + + queue := NewRequestQueue(log.NewNopLogger(), 1, forgetDelay, + promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), + promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + promauto.With(nil).NewHistogram(prometheus.HistogramOpts{})) + + // bypassing queue dispatcher loop for direct usage of the queueBroker and + // passing a nextRequestForQuerierCall for a canceled querier connection + queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, queue.forgetDelay) + queueBroker.addQuerierConnection(querierID) + + tr := tenantRequest{ + tenantID: TenantID("tenant-1"), + req: "request", + maxQueriers: 0, // no sharding + } + + require.Nil(t, queueBroker.tenantQueues["tenant-1"]) + require.NoError(t, queueBroker.enqueueRequestBack(&tr)) + require.Equal(t, queueBroker.tenantQueues["tenant-1"].requests.Len(), 1) + + ctx, cancel := context.WithCancel(context.Background()) + call := &nextRequestForQuerierCall{ + ctx: ctx, + querierID: QuerierID(querierID), + lastUserIndex: FirstUser(), + processed: make(chan nextRequestForQuerier), + } + cancel() // ensure querier context done before send is attempted + + // send to querier will fail but method returns true, + // indicating not to re-submit a request for nextRequestForQuerierCall for the querier + require.True(t, queue.tryDispatchRequestToQuerier(queueBroker, call)) + // assert request was re-enqueued for tenant after failed send + require.Equal(t, queueBroker.tenantQueues["tenant-1"].requests.Len(), 1) +} diff --git a/pkg/scheduler/queue/tenant_queues.go b/pkg/scheduler/queue/tenant_queues.go index 0a07aaa9f4c..97605d9b43a 100644 --- a/pkg/scheduler/queue/tenant_queues.go +++ b/pkg/scheduler/queue/tenant_queues.go @@ -36,6 +36,12 @@ func (s querierIDSlice) Search(x QuerierID) int { return sort.Search(len(s), func(i int) bool { return s[i] >= x }) } +type tenantRequest struct { + tenantID TenantID + req Request + maxQueriers int +} + type querierConn struct { // Number of active connections. connections int @@ -125,8 +131,8 @@ func (qb *queueBroker) len() int { return len(qb.tenantQueues) } -func (qb *queueBroker) enqueueRequestBack(r requestToEnqueue) error { - queue, err := qb.getOrAddTenantQueue(r.tenantID, r.maxQueriers) +func (qb *queueBroker) enqueueRequestBack(request *tenantRequest) error { + queue, err := qb.getOrAddTenantQueue(request.tenantID, request.maxQueriers) if err != nil { return err } @@ -135,7 +141,7 @@ func (qb *queueBroker) enqueueRequestBack(r requestToEnqueue) error { return ErrTooManyRequests } - queue.PushBack(r.req) + queue.PushBack(request) return nil } @@ -144,13 +150,13 @@ func (qb *queueBroker) enqueueRequestBack(r requestToEnqueue) error { // // max tenant queue size checks are skipped even though queue size violations // are not expected to occur when re-enqueuing a previously dequeued request. -func (qb *queueBroker) enqueueRequestFront(r requestToEnqueue) error { - queue, err := qb.getOrAddTenantQueue(r.tenantID, r.maxQueriers) +func (qb *queueBroker) enqueueRequestFront(request *tenantRequest) error { + queue, err := qb.getOrAddTenantQueue(request.tenantID, request.maxQueriers) if err != nil { return err } - queue.PushFront(r.req) + queue.PushFront(request) return nil } @@ -175,7 +181,7 @@ func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) ( return queue.requests, nil } -func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID QuerierID) (Request, TenantID, int, error) { +func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID QuerierID) (*tenantRequest, TenantID, int, error) { tenantID, tenantIndex, err := qb.tenantQuerierAssignments.getNextTenantIDForQuerier(lastTenantIndex, querierID) if err != nil { return nil, tenantID, tenantIndex, err @@ -193,7 +199,11 @@ func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID Q if tenantQueue.requests.Len() == 0 { qb.deleteQueue(tenantID) } - return queueElement.Value, tenantID, tenantIndex, nil + + // re-casting to same type it was enqueued as; panic would indicate a bug + request := queueElement.Value.(*tenantRequest) + + return request, tenantID, tenantIndex, nil }