diff --git a/pkg/convert/jfr/parser.go b/pkg/convert/jfr/parser.go index 938f1223e5..8fa4b4ab0e 100644 --- a/pkg/convert/jfr/parser.go +++ b/pkg/convert/jfr/parser.go @@ -14,8 +14,8 @@ import ( "github.com/pyroscope-io/pyroscope/pkg/storage/tree" ) -func ParseJFR(ctx context.Context, r io.Reader, s storage.Putter, pi *storage.PutInput) (err error) { - chunks, err := parser.Parse(r) +func ParseJFR(ctx context.Context, s storage.Putter, body io.Reader, pi *storage.PutInput) (err error) { + chunks, err := parser.Parse(body) if err != nil { return fmt.Errorf("unable to parse JFR format: %w", err) } @@ -24,11 +24,10 @@ func ParseJFR(ctx context.Context, r io.Reader, s storage.Putter, pi *storage.Pu err = multierror.Append(err, pErr) } } - pi.Val = nil return err } -func parse(ctx context.Context, c parser.Chunk, s storage.Putter, pi *storage.PutInput) (err error) { +func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *storage.PutInput) (err error) { var event, alloc, lock string cpu := tree.New() wall := tree.New() @@ -84,83 +83,50 @@ func parse(ctx context.Context, c parser.Chunk, s storage.Putter, pi *storage.Pu } } } - labels := pi.Key.Labels() - prefix := labels["__name__"] + + labelsOriginal := piOriginal.Key.Labels() + prefix := labelsOriginal["__name__"] + + cb := func(n string, t *tree.Tree, u metadata.Units) { + labels := map[string]string{} + for k, v := range labelsOriginal { + labels[k] = v + } + labels["__name__"] = prefix + "." + n + pi := &storage.PutInput{ + StartTime: piOriginal.StartTime, + EndTime: piOriginal.EndTime, + Key: segment.NewKey(labels), + Val: t, + SpyName: piOriginal.SpyName, + SampleRate: piOriginal.SampleRate, + Units: u, + AggregationType: metadata.SumAggregationType, + } + if putErr := s.Put(ctx, pi); putErr != nil { + err = multierror.Append(err, putErr) + } + } + if event == "cpu" || event == "itimer" || event == "wall" { profile := event if event == "wall" { profile = "cpu" } - labels["__name__"] = prefix + "." + profile - pi.Key = segment.NewKey(labels) - pi.Val = cpu - pi.Units = metadata.SamplesUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } + cb(profile, cpu, metadata.SamplesUnits) } if event == "wall" { - labels["__name__"] = prefix + "." + event - pi.Key = segment.NewKey(labels) - pi.Val = wall - pi.Units = metadata.SamplesUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } + cb(event, wall, metadata.SamplesUnits) } if alloc != "" { - labels["__name__"] = prefix + ".alloc_in_new_tlab_objects" - pi.Key = segment.NewKey(labels) - pi.Val = inTLABObjects - pi.Units = metadata.ObjectsUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } - labels["__name__"] = prefix + ".alloc_in_new_tlab_bytes" - pi.Key = segment.NewKey(labels) - pi.Val = inTLABBytes - pi.Units = metadata.BytesUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } - labels["__name__"] = prefix + ".alloc_outside_tlab_objects" - pi.Key = segment.NewKey(labels) - pi.Val = outTLABObjects - pi.Units = metadata.ObjectsUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } - labels["__name__"] = prefix + ".alloc_outside_tlab_bytes" - pi.Key = segment.NewKey(labels) - pi.Val = outTLABBytes - pi.Units = metadata.BytesUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } + cb("alloc_in_new_tlab_objects", inTLABObjects, metadata.ObjectsUnits) + cb("alloc_in_new_tlab_bytes", inTLABBytes, metadata.BytesUnits) + cb("alloc_outside_tlab_objects", outTLABObjects, metadata.ObjectsUnits) + cb("alloc_outside_tlab_bytes", outTLABBytes, metadata.BytesUnits) } if lock != "" { - labels["__name__"] = prefix + ".lock_count" - pi.Key = segment.NewKey(labels) - pi.Val = lockSamples - pi.Units = metadata.LockSamplesUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } - labels["__name__"] = prefix + ".lock_duration" - pi.Key = segment.NewKey(labels) - pi.Val = lockDuration - pi.Units = metadata.LockNanosecondsUnits - pi.AggregationType = metadata.SumAggregationType - if putErr := s.Put(ctx, pi); putErr != nil { - err = multierror.Append(err, putErr) - } + cb("lock_count", lockSamples, metadata.LockSamplesUnits) + cb("lock_duration", lockDuration, metadata.LockNanosecondsUnits) } return err } diff --git a/pkg/parser/parser.go b/pkg/parser/parser.go index 15cb81d5d7..1acd36de6e 100644 --- a/pkg/parser/parser.go +++ b/pkg/parser/parser.go @@ -93,7 +93,7 @@ func (p *Parser) Put(ctx context.Context, in *PutInput) error { err = convert.ParseIndividualLines(in.Body, cb) // with some formats we write directly to storage, hence the early return case in.Format == "jfr": - return jfr.ParseJFR(ctx, in.Body, p.putter, pi) + return jfr.ParseJFR(ctx, p.putter, in.Body, pi) case in.Format == "pprof": return writePprofFromBody(ctx, p.putter, in) case strings.Contains(in.ContentType, "multipart/form-data"): diff --git a/pkg/server/ingest_test.go b/pkg/server/ingest_test.go index 35aa5d9ff8..80a01c68f7 100644 --- a/pkg/server/ingest_test.go +++ b/pkg/server/ingest_test.go @@ -91,13 +91,18 @@ var _ = Describe("server", func() { done := make(chan interface{}) go func() { defer GinkgoRecover() - s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), prometheus.NewRegistry(), new(health.Controller)) + + reg := prometheus.NewRegistry() + + s, err := storage.New(storage.NewConfig(&(*cfg).Server), logrus.StandardLogger(), reg, new(health.Controller)) + queue := storage.NewIngestionQueue(logrus.StandardLogger(), s, reg, 4, 4) + Expect(err).ToNot(HaveOccurred()) e, _ := exporter.NewExporter(nil, nil) c, _ := New(Config{ Configuration: &(*cfg).Server, Storage: s, - Putter: s, + Putter: queue, MetricsExporter: e, Logger: logrus.New(), MetricsRegisterer: prometheus.NewRegistry(), @@ -145,6 +150,7 @@ var _ = Describe("server", func() { expectedKey = name } sk, _ := segment.ParseKey(expectedKey) + time.Sleep(10 * time.Millisecond) time.Sleep(sleepDur) gOut, err := s.Get(context.TODO(), &storage.GetInput{ StartTime: st,