diff --git a/pkg/log/log.go b/pkg/log/log.go index cb2c1421d7..5466ccd998 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -95,6 +95,7 @@ func Init(cfg Config) error { // NOTE: we add a level of indirection with our logging functions, // so we need additional caller depth logger = log.With(l, "ts", timestampFormat, "caller", log.Caller(4)) + traceRequestEnabled = isTraceRequestEnabled() return nil } @@ -205,3 +206,28 @@ func WarnRateLimited(keyvals ...interface{}) { func DebugRateLimited(keyvals ...interface{}) { rateLimit(debug, keyvals...) } + +var traceRequestEnabled bool + +func isTraceRequestEnabled() bool { + value := os.Getenv("PROMSCALE_TRACE_REQUEST") + if value == "" { + return false + } + enabled, err := strconv.ParseBool(value) + if err != nil || !enabled { + //assume off + return false + } + return true +} + +func TraceRequestEnabled() bool { + return traceRequestEnabled +} + +func TraceRequest(keyvals ...interface{}) { + if TraceRequestEnabled() { + Debug(keyvals...) + } +} diff --git a/pkg/pgmodel/ingestor/dispatcher.go b/pkg/pgmodel/ingestor/dispatcher.go index b3d95a3551..74c38472eb 100644 --- a/pkg/pgmodel/ingestor/dispatcher.go +++ b/pkg/pgmodel/ingestor/dispatcher.go @@ -21,6 +21,7 @@ import ( "github.com/timescale/promscale/pkg/pgmodel/metrics" "github.com/timescale/promscale/pkg/pgmodel/model" "github.com/timescale/promscale/pkg/pgxconn" + "github.com/timescale/promscale/pkg/psctx" "github.com/timescale/promscale/pkg/tracer" tput "github.com/timescale/promscale/pkg/util/throughput" ) @@ -245,6 +246,16 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 } p.getMetricBatcher(metricName) <- &insertDataRequest{requestCtx: ctx, spanCtx: span.SpanContext(), metric: metricName, data: data, finished: workFinished, batched: batched, errChan: errChan} } + + var startTime time.Time + if log.TraceRequestEnabled() { + t, err := psctx.StartTime(ctx) + if err != nil { + log.TraceRequest("component", "dispatcher", "err", err) + } + startTime = t + log.TraceRequest("component", "dispatcher", "event", "start", "metrics", len(rows), "samples", numRows, "start_time", startTime.UnixNano()) + } span.SetAttributes(attribute.Int64("num_rows", int64(numRows))) span.SetAttributes(attribute.Int("num_metrics", len(rows))) reportIncomingBatch(numRows) @@ -261,6 +272,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 case err = <-errChan: default: } + log.TraceRequest("component", "dispatcher", "event", "ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime)) reportMetricsTelemetry(maxt, numRows, 0) close(errChan) } else { @@ -275,6 +287,7 @@ func (p *pgxDispatcher) InsertTs(ctx context.Context, dataTS model.Data) (uint64 if err != nil { log.Error("msg", fmt.Sprintf("error on async send, dropping %d datapoints", numRows), "err", err) } + log.TraceRequest("component", "dispatcher", "event", "async_ack", "start_time", startTime.UnixNano(), "took", time.Since(startTime)) reportMetricsTelemetry(maxt, numRows, 0) }() } diff --git a/pkg/pgmodel/ingestor/reservation.go b/pkg/pgmodel/ingestor/reservation.go index a6a12c423a..5d1f769037 100644 --- a/pkg/pgmodel/ingestor/reservation.go +++ b/pkg/pgmodel/ingestor/reservation.go @@ -5,6 +5,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/timescale/promscale/pkg/log" ) type reservation struct { @@ -163,6 +165,7 @@ func (rq *ReservationQueue) Peek() (time.Time, bool) { case <-waitch: case <-time.After(250 * time.Millisecond): } + log.TraceRequest("component", "reservation", "event", "peek", "batched_metrics", rq.q.Len(), "waited", waited, "took", time.Since((*rq.q)[0].GetStartTime())) } return reservation.GetStartTime(), ok } @@ -213,6 +216,7 @@ func (rq *ReservationQueue) PopOntoBatch(batch []readRequest) ([]readRequest, in } else if !(len(batch) == 0 || items+total_items < 20000) { reason = "size_samples" } + log.TraceRequest("component", "reservation", "event", "pop", "reason", reason, "metrics", count, "items", total_items, "remaining_metrics", rq.q.Len()) return batch, count, reason }