Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Experiment with different batch criteria
Browse files Browse the repository at this point in the history
  • Loading branch information
cevian committed Nov 3, 2022
1 parent c348455 commit b1847ce
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
12 changes: 5 additions & 7 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,12 @@ func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *Rese
metrics.IngestorPipelineTime.With(labelsCopier).Observe(time.Since(startTime).Seconds())
span.AddEvent("After sleep")

batch, _ = reservationQ.PopOntoBatch(batch)
var reason string
batch, _, reason = reservationQ.PopOntoBatch(batch)
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": reason}).Inc()

span.AddEvent("Flushed due to" + reason)

if len(batch) == cap(batch) {
span.AddEvent("Batch is full")
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": "size"}).Inc()
} else {
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": "timeout"}).Inc()
}
metrics.IngestorBatchRemainingAfterFlushTotal.With(labelsCopier).Observe(float64(reservationQ.Len()))
span.SetAttributes(attribute.Int("num_batches", len(batch)))
return batch, true
Expand Down
4 changes: 3 additions & 1 deletion pkg/pgmodel/ingestor/metric_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
t = time.Time{}
}
metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds())
reservation.Update(reservationQ, t)
reservation.Update(reservationQ, t, len(req.data))
addSpan.End()
}
//This channel in synchronous (no buffering). This provides backpressure
Expand Down Expand Up @@ -254,11 +254,13 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
}

numSeries := pending.batch.CountSeries()
numSamples, numExemplars := pending.batch.Count()

select {
//try to send first, if not then keep batching
case copySender <- copyRequest{pending, info}:
metrics.IngestorFlushSeries.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSeries))
metrics.IngestorFlushInsertables.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(float64(numSamples + numExemplars))
metrics.IngestorBatchDuration.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(pending.Start).Seconds())
if pending.IsFull() {
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher", "reason": "size"}).Inc()
Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/ingestor/metric_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func TestSendBatches(t *testing.T) {
go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ)
resos := make([]readRequest, 0, 1)
reservationQ.Peek()
resos, cnt := reservationQ.PopOntoBatch(resos)
resos, cnt, _ := reservationQ.PopOntoBatch(resos)
require.Equal(t, 1, cnt)
require.Equal(t, 1, len(resos))
batch := <-(resos[0].copySender)
Expand Down
32 changes: 26 additions & 6 deletions pkg/pgmodel/ingestor/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingestor
import (
"container/heap"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -12,14 +13,17 @@ type reservation struct {

lock sync.Mutex
startTime time.Time

items int64
}

func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation {
return &reservation{cs, -1, sync.Mutex{}, startTime}
return &reservation{cs, -1, sync.Mutex{}, startTime, 1}
}

func (res *reservation) Update(rq *ReservationQueue, t time.Time) {
func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) {
rest := res.GetStartTime()
atomic.AddInt64(&res.items, int64(num_insertables))

if t.Before(rest) {
//this should happen rarely
Expand Down Expand Up @@ -82,7 +86,7 @@ func (res *reservationQueueInternal) Pop() interface{} {
}

type Reservation interface {
Update(*ReservationQueue, time.Time)
Update(*ReservationQueue, time.Time, int)
}

type ReservationQueue struct {
Expand Down Expand Up @@ -151,17 +155,33 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) {

// PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty.
// never blocks. Returns number of requests pop'ed.
func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, int) {
func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, int, string) {
rq.lock.Lock()
defer rq.lock.Unlock()

count := 0
for len(batch) < cap(batch) && rq.q.Len() > 0 {
items := int64(0)
if rq.q.Len() > 0 {
items = atomic.LoadInt64(&(*rq.q)[0].items)
}
total_items := int64(0)
for len(batch) < cap(batch) && rq.q.Len() > 0 && (len(batch) == 0 || items+total_items < 20000) {
res := heap.Pop(rq.q).(*reservation)
batch = append(batch, readRequest{res.copySender})
count++
total_items += items
items = 0
if rq.q.Len() > 0 {
items = atomic.LoadInt64(&(*rq.q)[0].items)
}
}
reason := "timeout"
if !(len(batch) < cap(batch)) {
reason = "size_metrics"
} else if !(len(batch) == 0 || items+total_items < 20000) {
reason = "size_samples"
}
return batch, count
return batch, count, reason
}

func (rq *ReservationQueue) update(res *reservation) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/metrics/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const (
MetricBatcherChannelCap = 1000
// FlushSize defines the batch size. It is the maximum number of samples/exemplars per insert batch.
// This translates to the max array size that we pass into `insert_metric_row`
FlushSize = 2000
FlushSize = 10000
MaxInsertStmtPerTxn = 100
)

Expand Down Expand Up @@ -279,6 +279,7 @@ func init() {
IngestorChannelCap,
IngestorChannelLenBatcher,
IngestorFlushSeries,
IngestorFlushInsertables,
IngestorInsertsPerBatch,
IngestorRowsPerBatch,
IngestorRowsPerInsert,
Expand Down

0 comments on commit b1847ce

Please sign in to comment.