diff --git a/x-pack/apm-server/sampling/eventstorage/storage.go b/x-pack/apm-server/sampling/eventstorage/storage.go index 816f22a2d1b..84caed3302b 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage.go +++ b/x-pack/apm-server/sampling/eventstorage/storage.go @@ -263,7 +263,23 @@ func (rw *ReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error rw.readKeyBuf = append(append(rw.readKeyBuf[:0], traceID...), ':') opts.Prefix = rw.readKeyBuf + // 1st pass: check whether there exist keys matching the prefix. + // Do not prefetch values so that the check is done in-memory. + // This is to optimize for cases when it is a miss. + opts.PrefetchValues = false iter := rw.txn.NewIterator(opts) + iter.Rewind() + if !iter.Valid() { + iter.Close() + return nil + } + iter.Close() + + // 2nd pass: this is only done when there exist keys matching the prefix. + // Fetch the events with PrefetchValues for performance. + // This is to optimize for cases when it is a hit. + opts.PrefetchValues = true + iter = rw.txn.NewIterator(opts) defer iter.Close() for iter.Rewind(); iter.Valid(); iter.Next() { item := iter.Item() diff --git a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go index 33a42208800..3c5bafcf17a 100644 --- a/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go +++ b/x-pack/apm-server/sampling/eventstorage/storage_bench_test.go @@ -161,6 +161,76 @@ func BenchmarkReadEvents(b *testing.B) { } } +func BenchmarkReadEventsHit(b *testing.B) { + // This test may take longer to run because setup time >> run time + // It may be possible that the next estimated b.N is a very large number due to short run time + // And causes next iteration setup to take a very long time. + const txnCountInTrace = 5 + + test := func(b *testing.B, codec eventstorage.Codec, bigTX bool) { + for _, hit := range []bool{false, true} { + b.Run(fmt.Sprintf("hit=%v", hit), func(b *testing.B) { + db := newBadgerDB(b, badgerOptions) + store := eventstorage.New(db, codec) + readWriter := store.NewReadWriter() + defer readWriter.Close() + wOpts := eventstorage.WriterOpts{ + TTL: time.Hour, + StorageLimitInBytes: 0, + } + + traceIDs := make([]string, b.N) + + for i := 0; i < b.N; i++ { + traceID := uuid.Must(uuid.NewV4()).String() + traceIDs[i] = traceID + for j := 0; j < txnCountInTrace; j++ { + transactionID := uuid.Must(uuid.NewV4()).String() + var transaction *modelpb.APMEvent + if bigTX { + transaction = makeTransaction(transactionID, traceID) + } else { + transaction = &modelpb.APMEvent{ + Transaction: &modelpb.Transaction{ + Id: transactionID, + }, + } + } + if err := readWriter.WriteTraceEvent(traceID, transactionID, transaction, wOpts); err != nil { + b.Fatal(err) + } + } + } + if err := readWriter.Flush(); err != nil { + b.Fatal(err) + } + + b.ResetTimer() + var batch modelpb.Batch + for i := 0; i < b.N; i++ { + batch = batch[:0] + + traceID := traceIDs[i] + if !hit { + // replace the last char to generate a random non-existent traceID + traceID = traceID[:len(traceID)-1] + "-" + } + + if err := readWriter.ReadTraceEvents(traceID, &batch); err != nil { + b.Fatal(err) + } + } + }) + } + } + + for _, bigTX := range []bool{true, false} { + b.Run(fmt.Sprintf("bigTX=%v", bigTX), func(b *testing.B) { + test(b, eventstorage.ProtobufCodec{}, bigTX) + }) + } +} + func BenchmarkIsTraceSampled(b *testing.B) { sampledTraceUUID := uuid.Must(uuid.NewV4()) unsampledTraceUUID := uuid.Must(uuid.NewV4())