Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Improvements to batching #1739

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgclient"
pgMetrics "github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/psctx"
"github.com/timescale/promscale/pkg/query"
"github.com/timescale/promscale/pkg/telemetry"
)
Expand Down Expand Up @@ -167,6 +168,8 @@ func withWarnLog(msg string, handler http.Handler) http.HandlerFunc {
func timeHandler(histogramVec prometheus.ObserverVec, path string, handler http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ctx := psctx.WithStartTime(r.Context(), start)
r = r.WithContext(ctx)
handler.ServeHTTP(w, r)
elapsedMs := time.Since(start).Milliseconds()
histogramVec.WithLabelValues(path).Observe(float64(elapsedMs))
Expand Down
26 changes: 26 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func Init(cfg Config) error {
// NOTE: we add a level of indirection with our logging functions,
// so we need additional caller depth
logger = log.With(l, "ts", timestampFormat, "caller", log.Caller(4))
traceRequestEnabled = isTraceRequestEnabled()
return nil
}

Expand Down Expand Up @@ -205,3 +206,28 @@ func WarnRateLimited(keyvals ...interface{}) {
func DebugRateLimited(keyvals ...interface{}) {
rateLimit(debug, keyvals...)
}

var traceRequestEnabled bool

func isTraceRequestEnabled() bool {
value := os.Getenv("PROMSCALE_TRACE_REQUEST")
if value == "" {
return false
}
enabled, err := strconv.ParseBool(value)
if err != nil || !enabled {
//assume off
return false
}
return true
}

func TraceRequestEnabled() bool {
return traceRequestEnabled
}

func TraceRequest(keyvals ...interface{}) {
if TraceRequestEnabled() {
Debug(keyvals...)
}
}
6 changes: 5 additions & 1 deletion pkg/pgmodel/ingestor/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package ingestor
import (
"context"
"sync"
"time"

"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgmodel/model"
Expand Down Expand Up @@ -34,6 +35,7 @@ type pendingBuffer struct {
spanCtx context.Context
needsResponse []insertDataTask
batch model.Batch
Start time.Time
}

var pendingBuffers = sync.Pool{
Expand All @@ -46,7 +48,9 @@ var pendingBuffers = sync.Pool{
}

func NewPendingBuffer() *pendingBuffer {
return pendingBuffers.Get().(*pendingBuffer)
pb := pendingBuffers.Get().(*pendingBuffer)
pb.Start = time.Now()
return pb
}

func (p *pendingBuffer) IsFull() bool {
Expand Down
70 changes: 46 additions & 24 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type copyRequest struct {

var (
getBatchMutex = &sync.Mutex{}
lastGetBatch = time.Time{}
handleDecompression = retryAfterDecompression
)

Expand Down Expand Up @@ -72,9 +73,13 @@ func (reqs copyBatch) VisitExemplar(callBack func(info *pgmodel.MetricInfo, s *p
return nil
}

type readRequest struct {
copySender <-chan copyRequest
}

// Handles actual insertion into the DB.
// We have one of these per connection reserved for insertion.
func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf *ExemplarLabelFormatter) {
func runCopier(conn pgxconn.PgxConn, sw *seriesWriter, elf *ExemplarLabelFormatter, reservationQ *ReservationQueue) {
requestBatch := make([]readRequest, 0, metrics.MaxInsertStmtPerTxn)
insertBatch := make([]copyRequest, 0, cap(requestBatch))
for {
Expand All @@ -90,7 +95,7 @@ func runCopier(conn pgxconn.PgxConn, in chan readRequest, sw *seriesWriter, elf
// the fact that we fetch the entire batch before executing any of the
// reads. This guarantees that we never need to batch the same metrics
// together in the copier.
requestBatch, ok = copierGetBatch(ctx, requestBatch, in)
requestBatch, ok = copierGetBatch(ctx, requestBatch, reservationQ)
if !ok {
span.End()
return
Expand Down Expand Up @@ -157,7 +162,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e
return nil
}

func copierGetBatch(ctx context.Context, batch []readRequest, in <-chan readRequest) ([]readRequest, bool) {
func copierGetBatch(ctx context.Context, batch []readRequest, reservationQ *ReservationQueue) ([]readRequest, bool) {
_, span := tracer.Default().Start(ctx, "get-batch")
defer span.End()
//This mutex is not for safety, but rather for better batching.
Expand All @@ -173,30 +178,45 @@ func copierGetBatch(ctx context.Context, batch []readRequest, in <-chan readRequ
span.AddEvent("Unlocking")
}(span)

req, ok := <-in
//note this metric logic does depend on the lock
now := time.Now()
if !lastGetBatch.IsZero() {
timeBetweenGetBatch := now.Sub(lastGetBatch)
metrics.IngestorWaitForCopierSeconds.With(labelsCopier).Observe(timeBetweenGetBatch.Seconds())

}
lastGetBatch = now

startTime, ok := reservationQ.Peek()
if !ok {
return batch, false
}
span.AddEvent("Appending first batch")
batch = append(batch, req)

//we use a small timeout to prevent low-pressure systems from using up too many
//txns and putting pressure on system
timeout := time.After(20 * time.Millisecond)
hot_gather:
for len(batch) < cap(batch) {
select {
case r2 := <-in:
span.AddEvent("Appending batch")
batch = append(batch, r2)
case <-timeout:
span.AddEvent("Timeout appending batches")
break hot_gather
}
}
if len(batch) == cap(batch) {
span.AddEvent("Batch is full")
since := time.Since(startTime)
//TODO: make configurable in CLI
minDuration := 0 * time.Millisecond

// Having a minimum batching duration can be useful if the system is using up too many txns or mxids.
// The prometheus remote-write dynamic sharding strategy should auto-adjust things to slow down writes
// in low-pressure environments but having a CLI-settable backstop can also be usefull in certain scenarios.
// Values that have previously been tested with good results: 50ms-250ms.
if since < minDuration {
span.AddEvent("Sleep waiting to batch")
sleepDuration := minDuration - since
metrics.IngestorWaitForBatchSleepSeconds.With(labelsCopier).Add(sleepDuration.Seconds())
metrics.IngestorWaitForBatchSleepTotal.With(labelsCopier).Inc()
time.Sleep(sleepDuration)
}

metrics.IngestorPipelineTime.With(labelsCopier).Observe(time.Since(startTime).Seconds())
span.AddEvent("After sleep")

var reason string
batch, _, reason = reservationQ.PopOntoBatch(batch)
metrics.IngestorBatchFlushTotal.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "reason": reason}).Inc()

span.AddEvent("Flushed due to" + reason)

metrics.IngestorBatchRemainingAfterFlushTotal.With(labelsCopier).Observe(float64(reservationQ.Len()))
span.SetAttributes(attribute.Int("num_batches", len(batch)))
return batch, true
}
Expand Down Expand Up @@ -501,7 +521,9 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
metrics.IngestorItems.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "exemplar"}).Add(float64(totalExemplars))

tput.ReportDuplicateMetrics(duplicateSamples, duplicateMetrics)
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(time.Since(insertStart).Seconds())
duration := time.Since(insertStart).Seconds()
metrics.IngestorInsertDuration.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(duration)
metrics.IngestorInsertDurationPerRow.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Observe(duration / (float64(totalExemplars + totalSamples)))
return nil, lowestMinTime
}

Expand Down
62 changes: 42 additions & 20 deletions pkg/pgmodel/ingestor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/timescale/promscale/pkg/pgmodel/metrics"
"github.com/timescale/promscale/pkg/pgmodel/model"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/psctx"
"github.com/timescale/promscale/pkg/tracer"
tput "github.com/timescale/promscale/pkg/util/throughput"
)
Expand All @@ -41,13 +42,14 @@ type pgxDispatcher struct {
invertedLabelsCache *cache.InvertedLabelsCache
exemplarKeyPosCache cache.PositionCache
batchers sync.Map
batchersWG sync.WaitGroup
completeMetricCreation chan struct{}
asyncAcks bool
copierReadRequestCh chan<- readRequest
seriesEpochRefresh *time.Ticker
doneChannel chan struct{}
closed *uber_atomic.Bool
doneWG sync.WaitGroup
reservationQ *ReservationQueue
}

var _ model.Dispatcher = &pgxDispatcher{}
Expand All @@ -59,13 +61,6 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
numCopiers = 1
}

// the copier read request channel retains the queue order between metrics
maxMetrics := 10000
copierReadRequestCh := make(chan readRequest, maxMetrics)

metrics.IngestorChannelCap.With(prometheus.Labels{"type": "metric", "subsystem": "copier", "kind": "sample"}).Set(float64(cap(copierReadRequestCh)))
metrics.RegisterCopierChannelLenMetric(func() float64 { return float64(len(copierReadRequestCh)) })

if cfg.IgnoreCompressedChunks {
// Handle decompression to not decompress anything.
handleDecompression = skipDecompression
Expand All @@ -82,9 +77,9 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
}
sw := NewSeriesWriter(conn, labelArrayOID, labelsCache)
elf := NewExamplarLabelFormatter(conn, eCache)

reservationQ := NewReservationQueue()
for i := 0; i < numCopiers; i++ {
go runCopier(conn, copierReadRequestCh, sw, elf)
go runCopier(conn, sw, elf, reservationQ)
}

inserter := &pgxDispatcher{
Expand All @@ -95,11 +90,11 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
exemplarKeyPosCache: eCache,
completeMetricCreation: make(chan struct{}, 1),
asyncAcks: cfg.MetricsAsyncAcks,
copierReadRequestCh: copierReadRequestCh,
// set to run at half our deletion interval
seriesEpochRefresh: time.NewTicker(30 * time.Minute),
doneChannel: make(chan struct{}),
closed: uber_atomic.NewBool(false),
reservationQ: reservationQ,
}
inserter.closed.Store(false)
runBatchWatcher(inserter.doneChannel)
Expand All @@ -110,7 +105,13 @@ func newPgxDispatcher(conn pgxconn.PgxConn, mCache cache.MetricCache, scache cac
return nil, err
}

go inserter.runCompleteMetricCreationWorker()
if !cfg.DisableMetricCreation {
inserter.doneWG.Add(1)
go func() {
defer inserter.doneWG.Done()
inserter.runCompleteMetricCreationWorker()
}()
}

if !cfg.DisableEpochSync {
inserter.doneWG.Add(1)
Expand Down Expand Up @@ -204,7 +205,8 @@ func (p *pgxDispatcher) Close() {
return true
})

close(p.copierReadRequestCh)
p.batchersWG.Wait()
p.reservationQ.Close()
close(p.doneChannel)
p.doneWG.Wait()
}
Expand All @@ -226,8 +228,10 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
maxt int64
rows = dataTS.Rows
workFinished = new(sync.WaitGroup)
batched = new(sync.WaitGroup)
)
workFinished.Add(len(rows))
batched.Add(len(rows))
// we only allocate enough space for a single error message here as we only
// report one error back upstream. The inserter should not block on this
// channel, but only insert if it's empty, anything else can deadlock.
Expand All @@ -240,7 +244,17 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
maxt = ts
}
}
p.getMetricBatcher(metricName) <- &insertDataRequest{spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, errChan: errChan}
p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan}
}

var startTime time.Time
if log.TraceRequestEnabled() {
t, err := psctx.StartTime(ctx)
if err != nil {
log.TraceRequest("component", "dispatcher", "err", err)
}
startTime = t
log.TraceRequest("component", "dispatcher", "event", "start", "metrics", len(rows), "samples", numRows, "start_time", startTime.UnixNano())
}
span.SetAttributes(attribute.Int64("num_rows", int64(numRows)))
span.SetAttributes(attribute.Int("num_metrics", len(rows)))
Expand All @@ -258,6 +272,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
case err = <-errChan:
default:
}
log.TraceRequest("component", "dispatcher", "event", "ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime))
reportMetricsTelemetry(maxt, numRows, 0)
close(errChan)
} else {
Expand All @@ -272,6 +287,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64
if err != nil {
log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err)
}
log.TraceRequest("component", "dispatcher", "event", "async_ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime))
reportMetricsTelemetry(maxt, numRows, 0)
}()
}
Expand Down Expand Up @@ -321,7 +337,11 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques
actual, old := p.batchers.LoadOrStore(metric, c)
batcher = actual
if !old {
go runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.copierReadRequestCh)
p.batchersWG.Add(1)
go func() {
defer p.batchersWG.Done()
runMetricBatcher(p.conn, c, metric, p.completeMetricCreation, p.metricTableNames, p.reservationQ)
}()
}
}
ch := batcher.(chan *insertDataRequest)
Expand All @@ -330,11 +350,13 @@ func (p *pgxDispatcher) getMetricBatcher(metric string) chan<- *insertDataReques
}

type insertDataRequest struct {
spanCtx trace.SpanContext
metric string
finished *sync.WaitGroup
data []model.Insertable
errChan chan error
requestCtx context.Context
spanCtx trace.SpanContext
metric string
finished *sync.WaitGroup
batched *sync.WaitGroup
data []model.Insertable
errChan chan error
}

func (idr *insertDataRequest) reportResult(err error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/pgmodel/ingestor/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Cfg struct {
TracesAsyncAcks bool
NumCopiers int
DisableEpochSync bool
DisableMetricCreation bool
IgnoreCompressedChunks bool
InvertedLabelsCacheSize uint64
TracesBatchTimeout time.Duration
Expand Down
Loading