Skip to content
This repository was archived by the owner on Apr 2, 2024. It is now read-only.

Commit 80eec93

Browse files
committed
Prevent flushing a batch with less than one request
Previously, when a new set of requests came in, we often flushed a very small first batch because we flush as soon as any data becomes available in Peek. Since small batches are ineffecient we fix this by waiting till the entire first request is batched to flush (with a timeout).
1 parent b93ea2b commit 80eec93

File tree

4 files changed

+42
-12
lines changed

4 files changed

+42
-12
lines changed

pkg/pgmodel/ingestor/dispatcher.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
227227
maxt int64
228228
rows = dataTS.Rows
229229
workFinished = new(sync.WaitGroup)
230+
batched = new(sync.WaitGroup)
230231
)
231232
workFinished.Add(len(rows))
233+
batched.Add(len(rows))
232234
// we only allocate enough space for a single error message here as we only
233235
// report one error back upstream. The inserter should not block on this
234236
// channel, but only insert if it's empty, anything else can deadlock.
@@ -241,7 +243,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
241243
maxt = ts
242244
}
243245
}
244-
p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan}
246+
p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan}
245247
}
246248
span.SetAttributes(attribute.Int64("num_rows", int64(numRows)))
247249
span.SetAttributes(attribute.Int("num_metrics", len(rows)))
@@ -339,6 +341,7 @@ type insertDataRequest struct {
339341
spanCtx trace.SpanContext
340342
metric string
341343
finished *sync.WaitGroup
344+
batched *sync.WaitGroup
342345
data []model.Insertable
343346
errChan chan error
344347
}

pkg/pgmodel/ingestor/metric_batcher.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
211211
}
212212
metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds())
213213
reservation.Update(reservationQ, t, len(req.data))
214+
req.batched.Done()
214215
addSpan.End()
215216
}
216217
//This channel in synchronous (no buffering). This provides backpressure
@@ -225,7 +226,7 @@ func sendBatches(firstReq *insertDataRequest, input chan *insertDataRequest, con
225226
t = time.Time{}
226227
}
227228
metrics.IngestorPipelineTime.With(prometheus.Labels{"type": "metric", "subsystem": "metric_batcher"}).Observe(time.Since(t).Seconds())
228-
reservation = reservationQ.Add(copySender, t)
229+
reservation = reservationQ.Add(copySender, req.batched, t)
229230
}
230231

231232
pending := NewPendingBuffer()

pkg/pgmodel/ingestor/metric_batcher_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,16 @@ func TestSendBatches(t *testing.T) {
143143
return l
144144
}
145145
var workFinished sync.WaitGroup
146+
var batched sync.WaitGroup
147+
batched.Add(1)
146148
errChan := make(chan error, 1)
147149
data := []model.Insertable{
148150
model.NewPromSamples(makeSeries(1), make([]prompb.Sample, 1)),
149151
model.NewPromSamples(makeSeries(2), make([]prompb.Sample, 1)),
150152
model.NewPromSamples(makeSeries(3), make([]prompb.Sample, 1)),
151153
}
152154
spanCtx := psctx.WithStartTime(context.Background(), time.Now().Add(-time.Hour))
153-
firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, errChan: errChan}
155+
firstReq := &insertDataRequest{metric: "test", requestCtx: spanCtx, data: data, finished: &workFinished, batched: &batched, errChan: errChan}
154156
reservationQ := NewReservationQueue()
155157
go sendBatches(firstReq, nil, nil, &pgmodel.MetricInfo{MetricID: 1, TableName: "test"}, reservationQ)
156158
resos := make([]readRequest, 0, 1)

pkg/pgmodel/ingestor/reservation.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,18 @@ import (
88
)
99

1010
type reservation struct {
11-
copySender <-chan copyRequest
12-
index int
11+
copySender <-chan copyRequest
12+
firstRequestBatched *sync.WaitGroup
13+
index int
1314

1415
lock sync.Mutex
1516
startTime time.Time
1617

1718
items int64
1819
}
1920

20-
func newReservation(cs <-chan copyRequest, startTime time.Time) *reservation {
21-
return &reservation{cs, -1, sync.Mutex{}, startTime, 1}
21+
func newReservation(cs <-chan copyRequest, startTime time.Time, batched *sync.WaitGroup) *reservation {
22+
return &reservation{cs, batched, -1, sync.Mutex{}, startTime, 1}
2223
}
2324

2425
func (res *reservation) Update(rq *ReservationQueue, t time.Time, num_insertables int) {
@@ -110,8 +111,8 @@ func NewReservationQueue() *ReservationQueue {
110111
return res
111112
}
112113

113-
func (rq *ReservationQueue) Add(cs <-chan copyRequest, startTime time.Time) Reservation {
114-
si := newReservation(cs, startTime)
114+
func (rq *ReservationQueue) Add(cs <-chan copyRequest, batched *sync.WaitGroup, startTime time.Time) Reservation {
115+
si := newReservation(cs, startTime, batched)
115116

116117
rq.lock.Lock()
117118
defer rq.lock.Unlock()
@@ -146,19 +147,42 @@ func (rq *ReservationQueue) Close() {
146147
// Peek gives the first startTime as well as if the queue is not closed.
147148
// It blocks until there is an element in the queue or it has been closed.
148149
func (rq *ReservationQueue) Peek() (time.Time, bool) {
150+
reservation, waited, ok := rq.peek()
151+
if !ok {
152+
return time.Time{}, false
153+
}
154+
if waited {
155+
/* If this is the first reservation in the queue, wait for the entire request to be batched with a timeout.
156+
* (timeout is really a safety measure to prevent deadlocks if some metric batcher is full, which is unlikely)*/
157+
waitch := make(chan struct{})
158+
go func() {
159+
reservation.firstRequestBatched.Wait()
160+
close(waitch)
161+
}()
162+
select {
163+
case <-waitch:
164+
case <-time.After(250 * time.Millisecond):
165+
}
166+
}
167+
return reservation.GetStartTime(), ok
168+
}
169+
170+
func (rq *ReservationQueue) peek() (*reservation, bool, bool) {
149171
rq.lock.Lock()
150172
defer rq.lock.Unlock()
173+
waited := false
151174
for !rq.closed && rq.q.Len() == 0 {
175+
waited = true
152176
rq.cond.Wait()
153177
}
154178

155179
if rq.q.Len() > 0 {
156-
first := (*rq.q)[0]
157-
return first.GetStartTime(), true
180+
firstReservation := (*rq.q)[0]
181+
return firstReservation, waited, true
158182
}
159183

160184
//must be closed
161-
return time.Time{}, false
185+
return nil, false, false
162186
}
163187

164188
// PopBatch pops from the queue to populate the batch until either batch is full or the queue is empty.

0 commit comments

Comments
 (0)