Skip to content

[Arrow] Column-major segment build for Arrow IPC sources#18638

Open
real-mj-song wants to merge 2 commits into
apache:masterfrom
real-mj-song:arrow-columnar-builder
Open

[Arrow] Column-major segment build for Arrow IPC sources#18638
real-mj-song wants to merge 2 commits into
apache:masterfrom
real-mj-song:arrow-columnar-builder

Conversation

@real-mj-song
Copy link
Copy Markdown
Contributor

@real-mj-song real-mj-song commented May 30, 2026

TL;DR: Adds ArrowColumnReaderFactory (caller-managed reader/allocator) and ArrowFileColumnReaderFactory (file convenience) to the pinot-arrow plugin, exposing Arrow IPC sources to the column-major build path introduced in #16727. Stacked on top of #18632.

Tracks #18629.

Depends on

#18632 (precursor PR) — extracts ArrowToPinotTypeConverter from ArrowRecordExtractor into a public shared utility. Must merge before this PR, since the new ArrowColumnReader.getValue(int) delegates to that utility.

The diff currently shows both commits (precursor + this work); once #18632 lands on master, GitHub will reduce the diff to just this PR's commit.

What changed

  • ArrowColumnReaderFactory — implements ColumnReaderFactory over a caller-managed ArrowReader and BufferAllocator. The factory does not close them. Accepts any ArrowReader subclass — ArrowFileReader, ArrowStreamReader, or a custom subclass that yields in-process record batches. Suitable for batch pipelines that produce Arrow batches in-process (e.g. a Spark task using mapInArrow) and want to drive buildColumnar() without disk I/O.

  • ArrowFileColumnReaderFactory — file-specialised convenience that opens a private RootAllocator (sized by the arrowAllocatorLimit config), opens the IPC file via ArrowFileReader, and closes all three on close(). The natural choice for callers that just want to read an Arrow file on disk.

  • ArrowColumnReader — wraps a single Arrow FieldVector and implements the three documented ColumnReader access patterns: generic sequential next() with null checks, typed sequential (nextInt() / nextLong() / …) with isNextNull() / skipNext(), and random access by docId. Single-value and multi-value (List of primitive) variants supported.

  • ArrowAccumulators — package-private helper shared by both factories. Walks every record batch and bulk-appends each wanted column into a per-column FieldVector accumulator via Arrow's VectorAppender (Visitor-based; grows buffers once per batch and bulk-copies, instead of per-row copyValueSafe). Rejects dictionary-encoded columns loudly via Preconditions.checkArgument since the column-major path doesn't yet implement DictionaryEncoder.decode. Also exposes a closeAll(map) utility shared by both factory close() paths.

ArrowColumnReader.getValue(int docId) delegates Arrow → Pinot type conversion to the shared ArrowToPinotTypeConverter (from #18632), returning canonical JDK types — String from Utf8 / LargeUtf8 (unwrapped from Arrow's Text), Object[] for List variants (with recursive element conversion), LocalDate / LocalTime / Timestamp for temporal types, BigDecimal for Decimal. Matches the row-major path's contract so per-column stats collectors, dictionary creators, and index creators downstream of buildColumnar() see the same JDK types they see today.

Compatibility

Purely additive on the SPI side:

  • The existing row-major ArrowRecordReader path is unchanged.
  • Users opt in to the column-major path by constructing the driver via the ColumnReaderFactory-accepting overload of SegmentIndexCreationDriverImpl.init.
  • No SPI changes — consumes the already-merged ColumnReader / ColumnReaderFactory interfaces from column major segment build for columnar datasource #16727.

Testing

Suite Tests
ArrowColumnReaderFactoryTest (caller-managed path via ArrowStreamReader) 3 / 3
ArrowFileColumnReaderFactoryTest (file path) 7 / 7
ArrowColumnarBuildIntegrationTest — segment metadata equivalence: row-major ↔ file column-major; row-major ↔ in-memory column-major 2 / 2
Row-major regression (ArrowRecordExtractorTest 54 + ArrowMessageDecoderTest 7 + ArrowRecordReaderTest 2) 63 / 63

The integration test builds a Pinot segment from the same Arrow fixture three ways:

  1. Row-major via ArrowRecordReader → driver.build()
  2. Column-major file via ArrowFileColumnReaderFactory → driver.build() → buildColumnar()
  3. Column-major in-memory via ArrowColumnReaderFactory(ArrowStreamReader, BufferAllocator) → driver.build() → buildColumnar() (no file touched on this path)

…and asserts per-column cardinality, min, max, totalDocs, and data type match, plus segment-level doc count match across each compared pair. Path 3 proves the disk-free use case works end-to-end.

Arrow caveat: type assumptions downstream of buildColumnar()

Several downstream cast sites in pinot-segment-local — for example StringColumnPreIndexStatsCollector.collect and NoDictColumnStatisticsCollector.collect — cast directly to String / Integer without type checks, and NPE on null entries. This holds for the existing segment-to-segment consumer of buildColumnar() but is fragile under any non-segment source. The shared converter satisfies the type contract today (e.g. unwraps Arrow's Text to String); the underlying assumptions are out of scope for this PR and warrant a follow-up to tighten them once a non-segment consumer exists in tree.

References

…TypeConverter

Pure refactor — no behavior change. Move the schema-driven Arrow → Pinot value
conversion (originally introduced in apache#18434) from ArrowRecordExtractor into a
new public static utility ArrowToPinotTypeConverter so it can be reused by
column-major ColumnReader implementations that wrap Arrow vectors.

ArrowToPinotTypeConverter exposes a single entry point:

    public static Object toPinotValue(Field field, Object value, boolean extractRawTimeValues)

with all per-type helpers (convertTimestamp / Date / Time / List / Map /
Struct / byRuntimeType) reduced to private static methods. The
extractRawTimeValues flag — previously an ArrowRecordExtractor instance
field — is threaded through as a method parameter so the converter remains
stateless.

ArrowRecordExtractor.convert(Field, Object) is removed; the per-row dispatch
in extract() now calls ArrowToPinotTypeConverter.toPinotValue() directly.
All existing tests in pinot-arrow continue to pass unchanged:

- ArrowRecordReaderTest (2 / 2)
- ArrowRecordExtractorTest (54 / 54)
- ArrowMessageDecoderTest (7 / 7)

This precursor lets a follow-up ColumnReader implementation reuse the
conversion without duplicating ~200 lines of schema-driven dispatch.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 30, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 64.39%. Comparing base (a762254) to head (26316d8).
⚠️ Report is 8 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18638      +/-   ##
============================================
- Coverage     64.40%   64.39%   -0.01%     
- Complexity     1137     1282     +145     
============================================
  Files          3337     3359      +22     
  Lines        206069   207825    +1756     
  Branches      32128    32447     +319     
============================================
+ Hits         132710   133826    +1116     
- Misses        62726    63237     +511     
- Partials      10633    10762     +129     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.39% <ø> (-0.01%) ⬇️
temurin 64.39% <ø> (-0.01%) ⬇️
unittests 64.39% <ø> (-0.01%) ⬇️
unittests1 56.78% <ø> (-0.04%) ⬇️
unittests2 37.14% <ø> (+0.21%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

…eader SPI

Introduce two ColumnReaderFactory implementations in the pinot-arrow plugin:

  ArrowColumnReaderFactory       (caller-managed ArrowReader + BufferAllocator)
  ArrowFileColumnReaderFactory   (factory-managed file mode)

Both expose the contents of an Arrow source to the column-major build path
added in apache#16727, pivoting through one ColumnReader per FieldVector instead
of through per-row GenericRow materialisation.

ArrowColumnReaderFactory is the general factory: the caller supplies an
already-open ArrowReader (any subclass — ArrowFileReader, ArrowStreamReader,
or a custom subclass that yields in-process record batches) plus the
BufferAllocator the reader was opened against. The factory does not close
the reader or allocator; the caller owns their lifecycle. Per-column
accumulator vectors created during init are owned by the factory and
released on close.

ArrowFileColumnReaderFactory is a file-specialised convenience: at init
time it opens a private RootAllocator sized by the arrowAllocatorLimit
config, opens the IPC file via ArrowFileReader, and closes all three on
close().

Both factories share their column-build logic via the package-private
ArrowAccumulators helper: walk every record batch, bulk-append each wanted
column into a per-column FieldVector accumulator via Arrow's VectorAppender
(Visitor-based; grows offset / data buffers once per batch and bulk-copies,
rather than per-row copyValueSafe), produce one ArrowColumnReader per
accumulator. The helper rejects dictionary-encoded columns loudly via
Preconditions.checkArgument since the column-major path does not yet
implement DictionaryEncoder.decode. The helper also exposes a closeAll
utility used by both factory close() paths so the per-vector close loop
lives once.

ArrowColumnReader wraps a single Arrow FieldVector and supports the three
documented ColumnReader access patterns: generic sequential next() with
null checks, typed sequential nextInt() / nextLong() / ... with
isNextNull() and skipNext(), and random access by docId. Single-value and
multi-value (List of primitive) variants are covered.

getValue(int docId) delegates Arrow → Pinot type conversion to the
ArrowToPinotTypeConverter utility (precursor commit). The converter returns
canonical JDK types — String for Utf8 / LargeUtf8 (unwrapped from Arrow's
Text), Object[] for List variants (with recursive element conversion),
LocalDate / LocalTime / Timestamp for temporal types, BigDecimal for
Decimal, etc. — matching the contract that Pinot's stats collectors,
dictionary creators, and index creators downstream of buildColumnar()
expect.

Unit tests cover sequential read across primitive types with nulls, random
access by docId, generic next() null counting, multi-value int column with
per-element nulls, column subset filtering, getAvailableColumns()
reflecting the source schema, multi-batch concatenation with verification
of values straddling batch boundaries, and the general factory's
caller-managed path with an ArrowStreamReader over a ByteArrayInputStream
(asserting caller-owned allocator remains usable after factory close).

An integration test builds a Pinot segment from the same Arrow IPC file via
both the row-major path (ArrowRecordReader → driver.build()) and the
file column-major path (ArrowFileColumnReaderFactory → driver.build() →
buildColumnar()), then asserts per-column metadata equivalence
(cardinality, min, max, totalDocs, data type) and segment-level doc count.
@real-mj-song real-mj-song force-pushed the arrow-columnar-builder branch from 609ca47 to 26316d8 Compare May 31, 2026 19:29
@real-mj-song real-mj-song marked this pull request as ready for review May 31, 2026 23:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants