Zero copy sketch views#18633
Open
davecromberge wants to merge 4 commits into
Open
Conversation
Introduces ForwardIndexReader.getBytesView(int, T) returning a ByteBuffer view of the BYTES single-value at the given doc id. The interface default wraps the existing getBytes() byte[] for source compatibility. V3 (VarByteChunkSVForwardIndexReader) and V4 (VarByteChunkForwardIndexReaderV4) override with true zero-copy paths: uncompressed slices into the underlying PinotDataBuffer via toDirectByteBuffer, compressed slices into the per-context decompression scratch buffer. V5/V6 inherit V4. Huge compressed values fall back to wrapping the freshly allocated byte[]. The Javadoc documents the single-row consumption contract: the returned buffer is valid only until the next reader call on the same context. JMH benchmark BenchmarkRawForwardIndexReader gains readV3View / readV4View alongside the byte[] baselines and adds PASS_THROUGH to the compression axis. Quick run shows ~55% improvement on PASS_THROUGH and ~22% on LZ4.
Wire the theta/CPC/Integer-Tuple query-time aggregation read path to consume
zero-copy ByteBuffer views from the forward index instead of allocating a
byte[] per row, gated by a codec-safe capability flag.
- ForwardIndexReader.isBufferViewStableAcrossReads() (default false; true for
PASS_THROUGH var-byte readers, where getBytesView returns stable mmap slices).
Compressed readers return false so batched callers fall back to getBytes.
- BlockValSet.getBytesValueViewsSV() + isBytesViewStableAcrossReads() defaults;
ProjectionBlockValSet answers from the underlying reader. DataBlockCache /
DataFetcher gain a non-cached view accessor that calls getBytesView per row.
- DistinctCount{Theta,CPC}Sketch and IntegerTupleSketch aggregation functions
branch on the flag and deserialize from ByteBuffer views. Theta is true
zero-copy (Sketch.wrap); CPC/Tuple skip the byte[] alloc but still heapify.
Theta's advanced multi-argument (FilterEvaluator) form stays on the byte[]
path. All view deserializers force LITTLE_ENDIAN to match the byte[] path.
This also covers star-tree queries, which reuse the same aggregation machinery
over PASS_THROUGH pre-aggregated metric columns. Additive and query-side only:
no segment format change; all SPI additions are defaults.
SketchViewPathParityTest proves the view path (PASS_THROUGH) and byte[] path
(LZ4) produce identical theta/CPC/tuple distinct-count results.
Wire the star-tree builder to read serialized sketch metric values as zero-copy ByteBuffer views from the source forward index, gated on the new codec-safe stability flag. - ValueAggregator gains applyRawValueFromBuffer / applyAggregatedValueFromBuffer default methods; defaults drain the buffer to byte[] and delegate, so existing implementors are unaffected. - Theta, CPC, and Integer-Tuple ValueAggregators override to consume the buffer via datasketches Memory.wrap. Theta is true zero-copy (Sketch.wrap); CPC and Tuple still heapify but skip the upstream byte[] allocation. - PinotSegmentColumnReader.getValueAsBuffer(int) delegates to the underlying reader's getBytesView for SV BYTES columns. - BaseSingleTreeBuilder computes a per-metric _metricUsesBufferPath[] flag once at construction (BYTES source + reader reports stable views). getSegmentRecord dispatches on the flag; mergeSegmentRecord dispatches on instanceof ByteBuffer so the buffer path safely composes with the read-all-then-sort batching that PASS_THROUGH mmap views survive. The path is only taken when the source column is PASS_THROUGH BYTES; compressed sources fall back to the existing byte[] path unchanged.
Replaces "new byte[remaining()] + memcpy + Memory.wrap(bytes)" with Memory.wrap(buffer, LITTLE_ENDIAN) in the theta, tuple, and CPC sketch serdes (top-level and accumulator variants) in ObjectSerDeUtils. Eliminates one allocation + one memcpy per intermediate sketch on the broker reduce path. Theta is a wrap-retain sketch, so the wrapper transitively pins the DataTable's variable-size byte[] until the accumulator's threshold-triggered union releases it; this is heap-resident, GC-managed memory and the lifetime bound matches reduce duration. Tuple and CPC use heapify and detach immediately. Adds parity tests asserting byte[]-input and ByteBuffer-input (including non-zero-offset slices matching DataTableImplV4.getCustomObject's output) produce equivalent sketches. Requires --add-opens=java.base/sun.nio.ch=ALL-UNNAMED for Memory.wrap(ByteBuffer) reflection on JDK 9+; Pinot's standard launcher already sets this.
davecromberge
added a commit
to permutive-engineering/pinot
that referenced
this pull request
May 29, 2026
…, b11a8c1, e164af0). Upstream PR: apache#18633 Conflict resolutions vs release-1.5.0: - VarByteChunkForwardIndexReaderV4.processChunkAndReadFirstValueView: inlined the chunk decompression call (master added a decompressChunk helper post-1.5.0 cut; inlined to match the existing pattern at processChunkAndReadFirstValue). - PinotSegmentColumnReader: added getStoredType() helper (master exposes getValueType() post-1.5.0 cut). BaseSingleTreeBuilder call site updated to use it. - ProjectionBlockValSet: added missing ForwardIndexReader import. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18633 +/- ##
============================================
+ Coverage 64.33% 64.51% +0.17%
- Complexity 1137 1282 +145
============================================
Files 3314 3353 +39
Lines 204100 207332 +3232
Branches 31771 32378 +607
============================================
+ Hits 131315 133762 +2447
- Misses 62241 62824 +583
- Partials 10544 10746 +202
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Eliminates the per-row
byte[]allocation +memcpythat today fronts every serialized-sketch read in Pinot. Theta sketches become true zero-copy viaSketch.wrap; CPC and Integer-Tuple sketches skip the upstream alloc but still heapify (no zero-copy wrap exists for those families).The change is additive and codec-safe — a stability flag on the forward-index reader gates every view path, so compressed columns transparently fall back to the existing
byte[]path. No segment format change, no wire protocol change, all SPI additions are interface defaults.What's in each commit
ForwardIndexReader.getBytesView(int, T)- SPI default wrapsgetBytes(); V3 (VarByteChunkSVForwardIndexReader) and V4 (VarByteChunkForwardIndexReaderV4) override with true zero-copy slices into the underlyingPinotDataBuffer(PASS_THROUGH) or the per-context decompression scratch (compressed). V5/V6 inherit V4. Huge compressed values fall back to byte[] (no slice path).ForwardIndexReader.isBufferViewStableAcrossReads()capability flag (trueonly for PASS_THROUGH var-byte readers).BlockValSet.getBytesValueViewsSV() / isBytesViewStableAcrossReads()mirror it.ProjectionBlockValSet/DataBlockCache/DataFetcherplumb views through. Theta/CPC/Tuple aggregation functions branch on the flag and consume views viaMemory.wrap(buffer, LITTLE_ENDIAN). Theta's advanced multi-argumentFilterEvaluatorform stays on the byte[] path (out of scope, see below).ValueAggregator.applyRawValueFromBuffer / applyAggregatedValueFromBufferSPI defaults (drain to byte[] + delegate); theta/CPC/Tuple aggregator overrides.PinotSegmentColumnReader.getValueAsBuffer.BaseSingleTreeBuilderdispatches on a per-metric_metricUsesBufferPathflag.ObjectSerDeUtilsswitches six sketch serdes (theta, tuple, CPC + their three accumulator variants) fromnew byte[remaining()] + memcpy + Memory.wrap(bytes)toMemory.wrap(buffer, LITTLE_ENDIAN). Source is the heap byte[] inDataTableImplV4._variableSizeData- immutable after wire-decode.Risk surface
getBytesView: returned buffer is valid only until the next call on the same context (the compressed reader slices a scratch buffer that gets overwritten). Stability flag gates every batched caller; non-batched code paths consume in-place. Documented at the SPI and at every introduced call site.Sketch.wrapretains the wrapped Memory - the only zero-copy retain in the branch. Audited at all three call sites: query and build wrappers are method-local; broker wrapper sits inThetaSketchAccumulator._accumulatoruntil the next threshold union (default 2). Heap byte[], GC-managed, bounded by reduce duration.--add-opens=java.base/sun.nio.ch=ALL-UNNAMEDrequirement -Memory.wrap(ByteBuffer)reflects into closed internals. Pinot's standard launcher (appAssemblerScriptTemplate) already sets this; surefire is configured for tests. Without the flag, PASS_THROUGH reads throwIllegalAccessError- flagged here in case any deployment uses a custom launcher.Memory.wrap(ByteBuffer, …)call passesByteOrder.LITTLE_ENDIANexplicitly to match the on-disk sketch format and the implicit native order ofMemory.wrap(byte[]).Benchmarks
Inner-loop pinpoint bench (50K rows × ~8 KB compact theta sketches, JDK 21, PASS_THROUGH):
BenchmarkRawForwardIndexReader(commit 1): ~55% improvement PASS_THROUGH, ~22% LZ4 at the SPI hot path.Star-tree build: ~10% improvement PASS_THROUGH (build is allocation-bound).
Theta dominates because
Sketch.wrapis true zero-copy; CPC/Tuple savings are smaller because heapify allocates internal state regardless of input shape.Test coverage
ForwardIndexBytesViewTest- V3/V4 view ≡ byte[] across all codecs and edge cases including huge values.SketchViewPathParityTest- theta/CPC/Tuple distinct count identical between PASS_THROUGH and LZ4 segments end-to-end through real query plumbing.DistinctCount{Theta,CPC,Tuple}SketchValueAggregatorTest— buffer-API path ≡ byte[]-API path per aggregator.SketchBuildPathParityTest- star-tree pre-aggregated sketches bit-identical between PASS_THROUGH and LZ4 source segments.ObjectSerDeUtilsBufferParityTest- deserialize from byte[] ≡ deserialize from ByteBuffer (including non-zero array offset slices matchingDataTableImplV4.getCustomObjectoutput).All pass.
BenchmarkValueAggregatorBufferApiverifies no regression on the default SPI fallback for non-overriding aggregators.Out of scope (deferred follow-ups)
distinctCountThetaSketchWithFilter, etc.) -FilterEvaluatorre-reads value arrays per row and casts tobyte[][]; switching it requires interfacechanges. Gated off in this PR.
KllDoublesSketch.wrap(also a retain-style wrap); mechanically straightforward but out of this PR's scope.IntBuffer; HLL+ delegates to a third-partyHyperLogLogPlus.Builder.build(byte[])that would need a fork.MergingDigest.fromBytes).pursued here because the codec-gated approach is safe everywhere and free on PASS_THROUGH where it matters most.