Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Improve batch sizes
Browse files Browse the repository at this point in the history
- Try to batch more before flushing
- Flush bigger batches first (hoping smaller batches get filled more)
  • Loading branch information
cevian committed Nov 3, 2022
1 parent baa0c84 commit 237392f
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
31 changes: 16 additions & 15 deletions pkg/pgmodel/ingestor/metric_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,21 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
numSamples, numExemplars := pending.batch.Count()

select {
//try to send first, if not then keep batching
//try to batch as much as possible before sending
case req, ok := <-recvCh:
if !ok {
if !pending.IsEmpty() {
span.AddEvent("Sending last non-empty batch")
copySender <- copyRequest{pending, info}
metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries))
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds())
}
span.AddEvent("Exiting metric batcher batch loop")
span.SetAttributes(attribute.Int("num_series", numSeries))
span.End()
return
}
addReq(req, pending)
case copySender <- copyRequest{pending, info}:
metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries))
metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars))
Expand All @@ -274,20 +288,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
pending = NewPendingBuffer()
pending.spanCtx, span = tracer.Default().Start(context.Background(), "send-batches")
span.SetAttributes(attribute.String("metric", info.TableName))
case req, ok := <-recvCh:
if !ok {
if !pending.IsEmpty() {
span.AddEvent("Sending last non-empty batch")
copySender <- copyRequest{pending, info}
metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries))
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds())
}
span.AddEvent("Exiting metric batcher batch loop")
span.SetAttributes(attribute.Int("num_series", numSeries))
span.End()
return
}
addReq(req, pending)

}
}
}
8 changes: 8 additions & 0 deletions pkg/pgmodel/ingestor/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ func newReservationQueueInternal() *reservationQueueInternal {
func (res reservationQueueInternal) Len() int { return len(res) }

func (res reservationQueueInternal) Less(i, j int) bool {
startTimeI := res[i].GetStartTime()
startTimeJ := res[j].GetStartTime()
if startTimeI.Equal(startTimeJ) {
itemsI := atomic.LoadInt64(&res[i].items)
itemsJ := atomic.LoadInt64(&res[j].items)
//prerer metrics with more items because they probably hold up more stuff
return itemsI > itemsJ
}
return res[i].GetStartTime().Before(res[j].GetStartTime())
}

Expand Down

0 comments on commit 237392f

Please sign in to comment.