From 70b39e7d9ac4891c5eb3b82d640d4da9f50b1731 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Tue, 1 Nov 2022 12:19:52 -0400 Subject: [PATCH] Prevent flushing a batch with less than one request Previously, when a new set of requests came in, we often flushed a very small first batch because we flush as soon as any data becomes available in Peek. Since small batches are ineffecient we fix this by waiting till the entire first request is batched to flush (with a timeout). --- pkg/pgmodel/ingestor/dispatcher.go | 5 ++- pkg/pgmodel/ingestor/metric_batcher.go | 3 +- pkg/pgmodel/ingestor/metric_batcher_test.go | 4 +- pkg/pgmodel/ingestor/reservation.go | 42 ++++++++++++++++----- 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index e80211f81b..b3d95a3551 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -227,8 +227,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt int64 rows = dataTS.Rows workFinished = new(sync.WaitGroup) + batched = new(sync.WaitGroup) ) workFinished.Add(len(rows)) + batched.Add(len(rows)) // we only allocate enough space for a single error message here as we only // report one error back upstream. The inserter should not block on this // channel, but only insert if it's empty, anything else can deadlock. @@ -241,7 +243,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -339,6 +341,7 @@ type insertDataRequest struct { spanCtx trace.SpanContext metric string finished *sync.WaitGroup + batched *sync.WaitGroup data []model.Insertable errChan chan error } diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 2f16fa11f9..29c98e9adc 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -211,6 +211,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) reservation.Update(reservationQ, t, len(req.data)) + req.batched.Done() addSpan.End() } //This channel in synchronous (no buffering). This provides backpressure @@ -225,7 +226,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con t = time.Time{} } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) - reservation = reservationQ.Add(copySender, t) + reservation = reservationQ.Add(copySender, req.batched, t) } pending := NewPendingBuffer() diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index 45fd772ebc..3b64919f27 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -143,6 +143,8 @@ func TestSendBatches(t *testing.T) { return l } var workFinished sync.WaitGroup + var batched sync.WaitGroup + batched.Add(1) errChan := make(chan error, 1) data := []model.Insertable{ model.NewPromSamples(makeSeries(1), make([]prompb.Sample, 1)), @@ -150,7 +152,7 @@ func TestSendBatches(t *testing.T) { model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)), } spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour)) - firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, errChan: errChan} + firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, batched: &batched, errChan: errChan} reservationQ := NewReservationQueue() go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ) resos := make([]readRequest, 0, 1) diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index ffee442c65..a6a12c423a 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -8,8 +8,9 @@ import ( ) type reservation struct { - copySender <-chan copyRequest - index int + copySender <-chan copyRequest + firstRequestBatched *sync.WaitGroup + index int lock sync.Mutex startTime time.Time @@ -17,8 +18,8 @@ type reservation struct { items int64 } -func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation { - return &reservation{cs, -1, sync.Mutex{}, startTime, 1} +func newReservation(cs <-chan copyRequest, startTime time.Time, batched *sync.WaitGroup) *reservation { + return &reservation{cs, batched, -1, sync.Mutex{}, startTime, 1} } func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) { @@ -110,8 +111,8 @@ func NewReservationQueue() *ReservationQueue { return res } -func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Reservation { - si := newReservation(cs, startTime) +func (rq *ReservationQueue) Add(cs <-chan copyRequest, batched *sync.WaitGroup, startTime time.Time) Reservation { + si := newReservation(cs, startTime, batched) rq.lock.Lock() defer rq.lock.Unlock() @@ -146,19 +147,42 @@ func (rq *ReservationQueue) Close() { // Peek gives the first startTime as well as if the queue is not closed. // It blocks until there is an element in the queue or it has been closed. func (rq *ReservationQueue) Peek() (time.Time, bool) { + reservation, waited, ok := rq.peek() + if !ok { + return time.Time{}, false + } + if waited { + /* If this is the first reservation in the queue, wait for the entire request to be batched with a timeout. + * (timeout is really a safety measure to prevent deadlocks if some metric batcher is full, which is unlikely)*/ + waitch := make(chan struct{}) + go func() { + reservation.firstRequestBatched.Wait() + close(waitch) + }() + select { + case <-waitch: + case <-time.After(250 * time.Millisecond): + } + } + return reservation.GetStartTime(), ok +} + +func (rq *ReservationQueue) peek() (*reservation, bool, bool) { rq.lock.Lock() defer rq.lock.Unlock() + waited := false for !rq.closed && rq.q.Len() == 0 { + waited = true rq.cond.Wait() } if rq.q.Len() > 0 { - first := (*rq.q)[0] - return first.GetStartTime(), true + firstReservation := (*rq.q)[0] + return firstReservation, waited, true } //must be closed - return time.Time{}, false + return nil, false, false } // PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty.