diff --git a/pkg/pgmodel/ingestor/metric_batcher.go b/pkg/pgmodel/ingestor/metric_batcher.go index f6b3280b8f..2f16fa11f9 100644 --- a/pkg/pgmodel/ingestor/metric_batcher.go +++ b/pkg/pgmodel/ingestor/metric_batcher.go @@ -257,7 +257,21 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con numSamples, numExemplars := pending.batch.Count() select { - //try to send first, if not then keep batching + //try to batch as much as possible before sending + case req, ok := <-recvCh: + if !ok { + if !pending.IsEmpty() { + span.AddEvent("Sending last non-empty batch") + copySender <- copyRequest{pending, info} + metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) + metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) + } + span.AddEvent("Exiting metric batcher batch loop") + span.SetAttributes(attribute.Int("num_series", numSeries)) + span.End() + return + } + addReq(req, pending) case copySender <- copyRequest{pending, info}: metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars)) @@ -274,20 +288,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con pending = NewPendingBuffer() pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches") span.SetAttributes(attribute.String("metric", info.TableName)) - case req, ok := <-recvCh: - if !ok { - if !pending.IsEmpty() { - span.AddEvent("Sending last non-empty batch") - copySender <- copyRequest{pending, info} - metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries)) - metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds()) - } - span.AddEvent("Exiting metric batcher batch loop") - span.SetAttributes(attribute.Int("num_series", numSeries)) - span.End() - return - } - addReq(req, pending) + } } } diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index c75024403e..ffee442c65 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -59,6 +59,14 @@ func newReservationQueueInternal() *reservationQueueInternal { func (res reservationQueueInternal) Len() int { return len(res) } func (res reservationQueueInternal) Less(i, j int) bool { + startTimeI := res[i].GetStartTime() + startTimeJ := res[j].GetStartTime() + if startTimeI.Equal(startTimeJ) { + itemsI := atomic.LoadInt64(&res[i].items) + itemsJ := atomic.LoadInt64(&res[j].items) + //prerer metrics with more items because they probably hold up more stuff + return itemsI > itemsJ + } return res[i].GetStartTime().Before(res[j].GetStartTime()) }