Skip to content

Commit

Permalink
fix interface conversion mismatch in query-scheduler (#6516)
Browse files Browse the repository at this point in the history
* fix interface conversion mismatch in query-scheduler

* adding test case for re-enqueueing after disconnected querier
  • Loading branch information
francoposa authored Oct 30, 2023
1 parent 3a2302b commit a058d17
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 19 deletions.
25 changes: 14 additions & 11 deletions pkg/scheduler/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,12 @@ func (q *RequestQueue) dispatcherLoop() {
//
// If request is successfully enqueued, successFn is called before any querier can receive the request.
func (q *RequestQueue) enqueueRequestToBroker(broker *queueBroker, r requestToEnqueue) error {
err := broker.enqueueRequestBack(r)
tr := tenantRequest{
tenantID: r.tenantID,
req: r.req,
maxQueriers: r.maxQueriers,
}
err := broker.enqueueRequestBack(&tr)
if err != nil {
if errors.Is(err, ErrTooManyRequests) {
q.discardedRequests.WithLabelValues(string(r.tenantID)).Inc()
Expand Down Expand Up @@ -265,7 +270,7 @@ func (q *RequestQueue) tryDispatchRequestToQuerier(broker *queueBroker, call *ne
}

reqForQuerier := nextRequestForQuerier{
req: req,
req: req.req,
lastUserIndex: call.lastUserIndex,
err: nil,
}
Expand All @@ -274,17 +279,15 @@ func (q *RequestQueue) tryDispatchRequestToQuerier(broker *queueBroker, call *ne
if requestSent {
q.queueLength.WithLabelValues(string(tenantID)).Dec()
} else {
// re-casting to same type it was enqueued as; panic would indicate a bug
reqToEnqueue := req.(requestToEnqueue)
// should never error; any item previously in the queue already passed validation
err := broker.enqueueRequestFront(reqToEnqueue)
level.Error(q.log).Log(
"msg", "failed to re-enqueue query request after dequeue",
"err", err, "tenant", tenantID, "querier", call.querierID,
)

err := broker.enqueueRequestFront(req)
if err != nil {
level.Error(q.log).Log(
"msg", "failed to re-enqueue query request after dequeue",
"err", err, "tenant", tenantID, "querier", call.querierID,
)
}
}

return true
}

Expand Down
40 changes: 40 additions & 0 deletions pkg/scheduler/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,3 +240,43 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldReturnImmediatelyIfQuerierI
_, _, err := queue.GetNextRequestForQuerier(context.Background(), FirstUser(), querierID)
require.EqualError(t, err, "querier has informed the scheduler it is shutting down")
}

func TestRequestQueue_tryDispatchRequestToQuerier_ShouldReEnqueueAfterFailedSendToQuerier(t *testing.T) {
const forgetDelay = 3 * time.Second
const querierID = "querier-1"

queue := NewRequestQueue(log.NewNopLogger(), 1, forgetDelay,
promauto.With(nil).NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
promauto.With(nil).NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
promauto.With(nil).NewHistogram(prometheus.HistogramOpts{}))

// bypassing queue dispatcher loop for direct usage of the queueBroker and
// passing a nextRequestForQuerierCall for a canceled querier connection
queueBroker := newQueueBroker(queue.maxOutstandingPerTenant, queue.forgetDelay)
queueBroker.addQuerierConnection(querierID)

tr := tenantRequest{
tenantID: TenantID("tenant-1"),
req: "request",
maxQueriers: 0, // no sharding
}

require.Nil(t, queueBroker.tenantQueues["tenant-1"])
require.NoError(t, queueBroker.enqueueRequestBack(&tr))
require.Equal(t, queueBroker.tenantQueues["tenant-1"].requests.Len(), 1)

ctx, cancel := context.WithCancel(context.Background())
call := &nextRequestForQuerierCall{
ctx: ctx,
querierID: QuerierID(querierID),
lastUserIndex: FirstUser(),
processed: make(chan nextRequestForQuerier),
}
cancel() // ensure querier context done before send is attempted

// send to querier will fail but method returns true,
// indicating not to re-submit a request for nextRequestForQuerierCall for the querier
require.True(t, queue.tryDispatchRequestToQuerier(queueBroker, call))
// assert request was re-enqueued for tenant after failed send
require.Equal(t, queueBroker.tenantQueues["tenant-1"].requests.Len(), 1)
}
26 changes: 18 additions & 8 deletions pkg/scheduler/queue/tenant_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ func (s querierIDSlice) Search(x QuerierID) int {
return sort.Search(len(s), func(i int) bool { return s[i] >= x })
}

type tenantRequest struct {
tenantID TenantID
req Request
maxQueriers int
}

type querierConn struct {
// Number of active connections.
connections int
Expand Down Expand Up @@ -125,8 +131,8 @@ func (qb *queueBroker) len() int {
return len(qb.tenantQueues)
}

func (qb *queueBroker) enqueueRequestBack(r requestToEnqueue) error {
queue, err := qb.getOrAddTenantQueue(r.tenantID, r.maxQueriers)
func (qb *queueBroker) enqueueRequestBack(request *tenantRequest) error {
queue, err := qb.getOrAddTenantQueue(request.tenantID, request.maxQueriers)
if err != nil {
return err
}
Expand All @@ -135,7 +141,7 @@ func (qb *queueBroker) enqueueRequestBack(r requestToEnqueue) error {
return ErrTooManyRequests
}

queue.PushBack(r.req)
queue.PushBack(request)
return nil
}

Expand All @@ -144,13 +150,13 @@ func (qb *queueBroker) enqueueRequestBack(r requestToEnqueue) error {
//
// max tenant queue size checks are skipped even though queue size violations
// are not expected to occur when re-enqueuing a previously dequeued request.
func (qb *queueBroker) enqueueRequestFront(r requestToEnqueue) error {
queue, err := qb.getOrAddTenantQueue(r.tenantID, r.maxQueriers)
func (qb *queueBroker) enqueueRequestFront(request *tenantRequest) error {
queue, err := qb.getOrAddTenantQueue(request.tenantID, request.maxQueriers)
if err != nil {
return err
}

queue.PushFront(r.req)
queue.PushFront(request)
return nil
}

Expand All @@ -175,7 +181,7 @@ func (qb *queueBroker) getOrAddTenantQueue(tenantID TenantID, maxQueriers int) (
return queue.requests, nil
}

func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID QuerierID) (Request, TenantID, int, error) {
func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID QuerierID) (*tenantRequest, TenantID, int, error) {
tenantID, tenantIndex, err := qb.tenantQuerierAssignments.getNextTenantIDForQuerier(lastTenantIndex, querierID)
if err != nil {
return nil, tenantID, tenantIndex, err
Expand All @@ -193,7 +199,11 @@ func (qb *queueBroker) dequeueRequestForQuerier(lastTenantIndex int, querierID Q
if tenantQueue.requests.Len() == 0 {
qb.deleteQueue(tenantID)
}
return queueElement.Value, tenantID, tenantIndex, nil

// re-casting to same type it was enqueued as; panic would indicate a bug
request := queueElement.Value.(*tenantRequest)

return request, tenantID, tenantIndex, nil

}

Expand Down

0 comments on commit a058d17

Please sign in to comment.