Skip to content

Zero copy sketch views#18633

Open
davecromberge wants to merge 4 commits into
apache:masterfrom
permutive-engineering:feature-contrib/zero-copy-sketch-views
Open

Zero copy sketch views#18633
davecromberge wants to merge 4 commits into
apache:masterfrom
permutive-engineering:feature-contrib/zero-copy-sketch-views

Conversation

@davecromberge
Copy link
Copy Markdown
Member

Summary

Eliminates the per-row byte[] allocation + memcpy that today fronts every serialized-sketch read in Pinot. Theta sketches become true zero-copy via Sketch.wrap; CPC and Integer-Tuple sketches skip the upstream alloc but still heapify (no zero-copy wrap exists for those families).

The change is additive and codec-safe — a stability flag on the forward-index reader gates every view path, so compressed columns transparently fall back to the existing byte[] path. No segment format change, no wire protocol change, all SPI additions are interface defaults.

What's in each commit

  1. ForwardIndexReader.getBytesView(int, T) - SPI default wraps getBytes(); V3 (VarByteChunkSVForwardIndexReader) and V4 (VarByteChunkForwardIndexReaderV4) override with true zero-copy slices into the underlying PinotDataBuffer (PASS_THROUGH) or the per-context decompression scratch (compressed). V5/V6 inherit V4. Huge compressed values fall back to byte[] (no slice path).
  2. Query read path - ForwardIndexReader.isBufferViewStableAcrossReads() capability flag (true only for PASS_THROUGH var-byte readers). BlockValSet.getBytesValueViewsSV() / isBytesViewStableAcrossReads() mirror it. ProjectionBlockValSet/DataBlockCache/DataFetcher plumb views through. Theta/CPC/Tuple aggregation functions branch on the flag and consume views via Memory.wrap(buffer, LITTLE_ENDIAN). Theta's advanced multi-argument FilterEvaluator form stays on the byte[] path (out of scope, see below).
  3. Star-tree build path - ValueAggregator.applyRawValueFromBuffer / applyAggregatedValueFromBuffer SPI defaults (drain to byte[] + delegate); theta/CPC/Tuple aggregator overrides. PinotSegmentColumnReader.getValueAsBuffer. BaseSingleTreeBuilder dispatches on a per-metric _metricUsesBufferPath flag.
  4. Broker reduce path - ObjectSerDeUtils switches six sketch serdes (theta, tuple, CPC + their three accumulator variants) from new byte[remaining()] + memcpy + Memory.wrap(bytes) to Memory.wrap(buffer, LITTLE_ENDIAN). Source is the heap byte[] in DataTableImplV4._variableSizeData - immutable after wire-decode.

Risk surface

  • Lifetime contract on getBytesView: returned buffer is valid only until the next call on the same context (the compressed reader slices a scratch buffer that gets overwritten). Stability flag gates every batched caller; non-batched code paths consume in-place. Documented at the SPI and at every introduced call site.
  • Theta Sketch.wrap retains the wrapped Memory - the only zero-copy retain in the branch. Audited at all three call sites: query and build wrappers are method-local; broker wrapper sits in ThetaSketchAccumulator._accumulator until the next threshold union (default 2). Heap byte[], GC-managed, bounded by reduce duration.
  • --add-opens=java.base/sun.nio.ch=ALL-UNNAMED requirement - Memory.wrap(ByteBuffer) reflects into closed internals. Pinot's standard launcher (appAssemblerScriptTemplate) already sets this; surefire is configured for tests. Without the flag, PASS_THROUGH reads throw IllegalAccessError - flagged here in case any deployment uses a custom launcher.
  • Byte order: every Memory.wrap(ByteBuffer, …) call passes ByteOrder.LITTLE_ENDIAN explicitly to match the on-disk sketch format and the implicit native order of Memory.wrap(byte[]).

Benchmarks

Inner-loop pinpoint bench (50K rows × ~8 KB compact theta sketches, JDK 21, PASS_THROUGH):

sketch byte[] (ms/op) view (ms/op) delta
theta ~25 ~5 −80%
cpc ~485 ~500 ~0% (heapify dominates)
tuple ~606 ~528 −13%

BenchmarkRawForwardIndexReader (commit 1): ~55% improvement PASS_THROUGH, ~22% LZ4 at the SPI hot path.

Star-tree build: ~10% improvement PASS_THROUGH (build is allocation-bound).

Theta dominates because Sketch.wrap is true zero-copy; CPC/Tuple savings are smaller because heapify allocates internal state regardless of input shape.

Test coverage

  • ForwardIndexBytesViewTest - V3/V4 view ≡ byte[] across all codecs and edge cases including huge values.
  • SketchViewPathParityTest - theta/CPC/Tuple distinct count identical between PASS_THROUGH and LZ4 segments end-to-end through real query plumbing.
  • DistinctCount{Theta,CPC,Tuple}SketchValueAggregatorTest — buffer-API path ≡ byte[]-API path per aggregator.
  • SketchBuildPathParityTest - star-tree pre-aggregated sketches bit-identical between PASS_THROUGH and LZ4 source segments.
  • ObjectSerDeUtilsBufferParityTest - deserialize from byte[] ≡ deserialize from ByteBuffer (including non-zero array offset slices matching DataTableImplV4.getCustomObject output).

All pass. BenchmarkValueAggregatorBufferApi verifies no regression on the default SPI fallback for non-overriding aggregators.

Out of scope (deferred follow-ups)

  • Theta multi-argument form (distinctCountThetaSketchWithFilter, etc.) - FilterEvaluator re-reads value arrays per row and casts to byte[][]; switching it requires interface
    changes. Gated off in this PR.
  • KLL doubles sketch - uses KllDoublesSketch.wrap (also a retain-style wrap); mechanically straightforward but out of this PR's scope.
  • HLL / HLL+ - HLL already reads via IntBuffer; HLL+ delegates to a third-party HyperLogLogPlus.Builder.build(byte[]) that would need a fork.
  • TDigest - already optimal (passes the buffer directly to MergingDigest.fromBytes).
  • Compressed columns getting the inner-loop win - would require restructuring aggregation functions to consume views per-row instead of materialising a view array. Larger refactor; not
    pursued here because the codec-gated approach is safe everywhere and free on PASS_THROUGH where it matters most.

Introduces ForwardIndexReader.getBytesView(int, T) returning a ByteBuffer
view of the BYTES single-value at the given doc id. The interface default
wraps the existing getBytes() byte[] for source compatibility.

V3 (VarByteChunkSVForwardIndexReader) and V4 (VarByteChunkForwardIndexReaderV4)
override with true zero-copy paths: uncompressed slices into the underlying
PinotDataBuffer via toDirectByteBuffer, compressed slices into the per-context
decompression scratch buffer. V5/V6 inherit V4. Huge compressed values fall
back to wrapping the freshly allocated byte[].

The Javadoc documents the single-row consumption contract: the returned
buffer is valid only until the next reader call on the same context.

JMH benchmark BenchmarkRawForwardIndexReader gains readV3View / readV4View
alongside the byte[] baselines and adds PASS_THROUGH to the compression axis.
Quick run shows ~55% improvement on PASS_THROUGH and ~22% on LZ4.
Wire the theta/CPC/Integer-Tuple query-time aggregation read path to consume
zero-copy ByteBuffer views from the forward index instead of allocating a
byte[] per row, gated by a codec-safe capability flag.

- ForwardIndexReader.isBufferViewStableAcrossReads() (default false; true for
  PASS_THROUGH var-byte readers, where getBytesView returns stable mmap slices).
  Compressed readers return false so batched callers fall back to getBytes.
- BlockValSet.getBytesValueViewsSV() + isBytesViewStableAcrossReads() defaults;
  ProjectionBlockValSet answers from the underlying reader. DataBlockCache /
  DataFetcher gain a non-cached view accessor that calls getBytesView per row.
- DistinctCount{Theta,CPC}Sketch and IntegerTupleSketch aggregation functions
  branch on the flag and deserialize from ByteBuffer views. Theta is true
  zero-copy (Sketch.wrap); CPC/Tuple skip the byte[] alloc but still heapify.
  Theta's advanced multi-argument (FilterEvaluator) form stays on the byte[]
  path. All view deserializers force LITTLE_ENDIAN to match the byte[] path.

This also covers star-tree queries, which reuse the same aggregation machinery
over PASS_THROUGH pre-aggregated metric columns. Additive and query-side only:
no segment format change; all SPI additions are defaults.

SketchViewPathParityTest proves the view path (PASS_THROUGH) and byte[] path
(LZ4) produce identical theta/CPC/tuple distinct-count results.
Wire the star-tree builder to read serialized sketch metric values as zero-copy
ByteBuffer views from the source forward index, gated on the new codec-safe
stability flag.

- ValueAggregator gains applyRawValueFromBuffer / applyAggregatedValueFromBuffer
  default methods; defaults drain the buffer to byte[] and delegate, so existing
  implementors are unaffected.
- Theta, CPC, and Integer-Tuple ValueAggregators override to consume the buffer
  via datasketches Memory.wrap. Theta is true zero-copy (Sketch.wrap); CPC and
  Tuple still heapify but skip the upstream byte[] allocation.
- PinotSegmentColumnReader.getValueAsBuffer(int) delegates to the underlying
  reader's getBytesView for SV BYTES columns.
- BaseSingleTreeBuilder computes a per-metric _metricUsesBufferPath[] flag once
  at construction (BYTES source + reader reports stable views). getSegmentRecord
  dispatches on the flag; mergeSegmentRecord dispatches on instanceof ByteBuffer
  so the buffer path safely composes with the read-all-then-sort batching that
  PASS_THROUGH mmap views survive.

The path is only taken when the source column is PASS_THROUGH BYTES; compressed
sources fall back to the existing byte[] path unchanged.
Replaces "new byte[remaining()] + memcpy + Memory.wrap(bytes)" with
Memory.wrap(buffer, LITTLE_ENDIAN) in the theta, tuple, and CPC sketch
serdes (top-level and accumulator variants) in ObjectSerDeUtils. Eliminates
one allocation + one memcpy per intermediate sketch on the broker reduce path.

Theta is a wrap-retain sketch, so the wrapper transitively pins the
DataTable's variable-size byte[] until the accumulator's threshold-triggered
union releases it; this is heap-resident, GC-managed memory and the lifetime
bound matches reduce duration. Tuple and CPC use heapify and detach
immediately. Adds parity tests asserting byte[]-input and ByteBuffer-input
(including non-zero-offset slices matching DataTableImplV4.getCustomObject's
output) produce equivalent sketches.

Requires --add-opens=java.base/sun.nio.ch=ALL-UNNAMED for
Memory.wrap(ByteBuffer) reflection on JDK 9+; Pinot's standard launcher
already sets this.
davecromberge added a commit to permutive-engineering/pinot that referenced this pull request May 29, 2026
…, b11a8c1, e164af0).

Upstream PR: apache#18633

Conflict resolutions vs release-1.5.0:
- VarByteChunkForwardIndexReaderV4.processChunkAndReadFirstValueView: inlined
  the chunk decompression call (master added a decompressChunk helper
  post-1.5.0 cut; inlined to match the existing pattern at
  processChunkAndReadFirstValue).
- PinotSegmentColumnReader: added getStoredType() helper (master exposes
  getValueType() post-1.5.0 cut). BaseSingleTreeBuilder call site updated
  to use it.
- ProjectionBlockValSet: added missing ForwardIndexReader import.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 29, 2026

Codecov Report

❌ Patch coverage is 80.20833% with 38 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.51%. Comparing base (2de4384) to head (e164af0).
⚠️ Report is 42 commits behind head on master.

Files with missing lines Patch % Lines
...java/org/apache/pinot/core/common/DataFetcher.java 46.66% 7 Missing and 1 partial ⚠️
...java/org/apache/pinot/core/common/BlockValSet.java 0.00% 6 Missing ⚠️
...inot/segment/local/aggregator/ValueAggregator.java 0.00% 6 Missing ⚠️
...ion/DistinctCountCPCSketchAggregationFunction.java 64.28% 4 Missing and 1 partial ⚠️
...unction/IntegerTupleSketchAggregationFunction.java 71.42% 4 Missing ⚠️
...ders/forward/VarByteChunkForwardIndexReaderV4.java 93.33% 3 Missing ⚠️
...regator/DistinctCountCPCSketchValueAggregator.java 75.00% 0 Missing and 2 partials ⚠️
...ore/operator/docvalsets/ProjectionBlockValSet.java 80.00% 0 Missing and 1 partial ⚠️
...n/DistinctCountThetaSketchAggregationFunction.java 92.30% 0 Missing and 1 partial ⚠️
...ders/forward/VarByteChunkSVForwardIndexReader.java 94.73% 1 Missing ⚠️
... and 1 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18633      +/-   ##
============================================
+ Coverage     64.33%   64.51%   +0.17%     
- Complexity     1137     1282     +145     
============================================
  Files          3314     3353      +39     
  Lines        204100   207332    +3232     
  Branches      31771    32378     +607     
============================================
+ Hits         131315   133762    +2447     
- Misses        62241    62824     +583     
- Partials      10544    10746     +202     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.51% <80.20%> (+0.17%) ⬆️
temurin 64.51% <80.20%> (+0.17%) ⬆️
unittests 64.51% <80.20%> (+0.17%) ⬆️
unittests1 56.86% <44.27%> (+0.12%) ⬆️
unittests2 37.20% <52.08%> (+1.60%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants