diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index e80211f81b..b3d95a3551 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -227,8 +227,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt int64 rows = dataTS.Rows workFinished = new(sync.WaitGroup) + batched = new(sync.WaitGroup) ) workFinished.Add(len(rows)) + batched.Add(len(rows)) // we only allocate enough space for a single error message here as we only // report one error back upstream. The inserter should not block on this // channel, but only insert if it's empty, anything else can deadlock. @@ -241,7 +243,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 maxt = ts } } - p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan} + p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan} } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) @@ -339,6 +341,7 @@ type insertDataRequest struct { spanCtx trace.SpanContext metric string finished *sync.WaitGroup + batched *sync.WaitGroup data []model.Insertable errChan chan error } diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index 2f16fa11f9..29c98e9adc 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -211,6 +211,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) reservation.Update(reservationQ, t, len(req.data)) + req.batched.Done() addSpan.End() } //This channel in synchronous (no buffering). This provides backpressure @@ -225,7 +226,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con t = time.Time{} } metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds()) - reservation = reservationQ.Add(copySender, t) + reservation = reservationQ.Add(copySender, req.batched, t) } pending := NewPendingBuffer() diff --git a/pkg/pgmodel/ingestor/metric_batcher_test.go b/pkg/pgmodel/ingestor/metric_batcher_test.go index 45fd772ebc..3b64919f27 100644 --- a/pkg/pgmodel/ingestor/metric_batcher_test.go +++ b/pkg/pgmodel/ingestor/metric_batcher_test.go @@ -143,6 +143,8 @@ func TestSendBatches(t *testing.T) { return l } var workFinished sync.WaitGroup + var batched sync.WaitGroup + batched.Add(1) errChan := make(chan error, 1) data := []model.Insertable{ model.NewPromSamples(makeSeries(1), make([]prompb.Sample, 1)), @@ -150,7 +152,7 @@ func TestSendBatches(t *testing.T) { model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)), } spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour)) - firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, errChan: errChan} + firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, batched: &batched, errChan: errChan} reservationQ := NewReservationQueue() go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ) resos := make([]readRequest, 0, 1) diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index ffee442c65..a6a12c423a 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -8,8 +8,9 @@ import ( ) type reservation struct { - copySender <-chan copyRequest - index int + copySender <-chan copyRequest + firstRequestBatched *sync.WaitGroup + index int lock sync.Mutex startTime time.Time @@ -17,8 +18,8 @@ type reservation struct { items int64 } -func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation { - return &reservation{cs, -1, sync.Mutex{}, startTime, 1} +func newReservation(cs <-chan copyRequest, startTime time.Time, batched *sync.WaitGroup) *reservation { + return &reservation{cs, batched, -1, sync.Mutex{}, startTime, 1} } func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) { @@ -110,8 +111,8 @@ func NewReservationQueue() *ReservationQueue { return res } -func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Reservation { - si := newReservation(cs, startTime) +func (rq *ReservationQueue) Add(cs <-chan copyRequest, batched *sync.WaitGroup, startTime time.Time) Reservation { + si := newReservation(cs, startTime, batched) rq.lock.Lock() defer rq.lock.Unlock() @@ -146,19 +147,42 @@ func (rq *ReservationQueue) Close() { // Peek gives the first startTime as well as if the queue is not closed. // It blocks until there is an element in the queue or it has been closed. func (rq *ReservationQueue) Peek() (time.Time, bool) { + reservation, waited, ok := rq.peek() + if !ok { + return time.Time{}, false + } + if waited { + /* If this is the first reservation in the queue, wait for the entire request to be batched with a timeout. + * (timeout is really a safety measure to prevent deadlocks if some metric batcher is full, which is unlikely)*/ + waitch := make(chan struct{}) + go func() { + reservation.firstRequestBatched.Wait() + close(waitch) + }() + select { + case <-waitch: + case <-time.After(250 * time.Millisecond): + } + } + return reservation.GetStartTime(), ok +} + +func (rq *ReservationQueue) peek() (*reservation, bool, bool) { rq.lock.Lock() defer rq.lock.Unlock() + waited := false for !rq.closed && rq.q.Len() == 0 { + waited = true rq.cond.Wait() } if rq.q.Len() > 0 { - first := (*rq.q)[0] - return first.GetStartTime(), true + firstReservation := (*rq.q)[0] + return firstReservation, waited, true } //must be closed - return time.Time{}, false + return nil, false, false } // PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty.