Skip to content

Commit

Permalink
chore: log slower requests till recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Feb 4, 2025
1 parent 97d1aad commit 89ff670
Showing 1 changed file with 27 additions and 2 deletions.
29 changes: 27 additions & 2 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff"
Expand All @@ -27,6 +28,7 @@ import (
transformerclient "github.com/rudderlabs/rudder-server/internal/transformer-client"
"github.com/rudderlabs/rudder-server/processor/integrations"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand Down Expand Up @@ -253,6 +255,8 @@ func NewTransformer(conf *config.Config, log logger.Logger, stat stats.Stats, op
for _, opt := range opts {
opt(&trans)
}
logDurations = new(atomic.Bool)
recoveryCount = new(atomic.Int32)

return &trans
}
Expand Down Expand Up @@ -479,6 +483,9 @@ func (trans *handle) request(ctx context.Context, url, stage string, data []Tran
return transformerResponses
}

var logDurations *atomic.Bool
var recoveryCount *atomic.Int32

func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage string, tags stats.Tags) ([]byte, int) {
var (
retryCount int
Expand Down Expand Up @@ -509,7 +516,25 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri

resp, reqErr = trans.httpClient.Do(req)
})
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(time.Since(requestStartTime))
duration := time.Since(requestStartTime)
if duration >= time.Second && !logDurations.Load() {
logDurations.Store(true)
}
if logDurations.Load() {
if duration < time.Second {
recoveryCount.Add(1)
} else {
trans.logger.Infow("duration > 1s", time.Now().Format(misc.RFC3339Milli), duration)
}
if recoveryCount.Load() > 0 && recoveryCount.Load()%10 == 0 {
trans.logger.Infow("getting back to < 1s", time.Now().Format(misc.RFC3339Milli), duration)
}
if recoveryCount.Load() > 100 {
logDurations.Store(false)
recoveryCount.Store(0)
}
}
trans.stat.NewTaggedStat("processor.transformer_request_time", stats.TimerType, tags).SendTiming(duration)
if reqErr != nil {
return reqErr
}
Expand All @@ -520,7 +545,7 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri
newTags := lo.Assign(tags)
newTags["instanceWorker"] = instanceWorker
trans.stat.NewTaggedStat("processor_transformer_instance_event_count", stats.CountType, newTags).Count(numEvents)
dur := time.Since(requestStartTime).Milliseconds()
dur := duration.Milliseconds()
headerTime, err := strconv.ParseFloat(strings.TrimSuffix(headerResponseTime, "ms"), 64)
if err == nil {
diff := float64(dur) - headerTime
Expand Down

0 comments on commit 89ff670

Please sign in to comment.