[Arrow] Column-major segment build for Arrow IPC sources#18638
Open
real-mj-song wants to merge 2 commits into
Open
[Arrow] Column-major segment build for Arrow IPC sources#18638real-mj-song wants to merge 2 commits into
real-mj-song wants to merge 2 commits into
Conversation
…TypeConverter Pure refactor — no behavior change. Move the schema-driven Arrow → Pinot value conversion (originally introduced in apache#18434) from ArrowRecordExtractor into a new public static utility ArrowToPinotTypeConverter so it can be reused by column-major ColumnReader implementations that wrap Arrow vectors. ArrowToPinotTypeConverter exposes a single entry point: public static Object toPinotValue(Field field, Object value, boolean extractRawTimeValues) with all per-type helpers (convertTimestamp / Date / Time / List / Map / Struct / byRuntimeType) reduced to private static methods. The extractRawTimeValues flag — previously an ArrowRecordExtractor instance field — is threaded through as a method parameter so the converter remains stateless. ArrowRecordExtractor.convert(Field, Object) is removed; the per-row dispatch in extract() now calls ArrowToPinotTypeConverter.toPinotValue() directly. All existing tests in pinot-arrow continue to pass unchanged: - ArrowRecordReaderTest (2 / 2) - ArrowRecordExtractorTest (54 / 54) - ArrowMessageDecoderTest (7 / 7) This precursor lets a follow-up ColumnReader implementation reuse the conversion without duplicating ~200 lines of schema-driven dispatch.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18638 +/- ##
============================================
- Coverage 64.40% 64.39% -0.01%
- Complexity 1137 1282 +145
============================================
Files 3337 3359 +22
Lines 206069 207825 +1756
Branches 32128 32447 +319
============================================
+ Hits 132710 133826 +1116
- Misses 62726 63237 +511
- Partials 10633 10762 +129
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…eader SPI Introduce two ColumnReaderFactory implementations in the pinot-arrow plugin: ArrowColumnReaderFactory (caller-managed ArrowReader + BufferAllocator) ArrowFileColumnReaderFactory (factory-managed file mode) Both expose the contents of an Arrow source to the column-major build path added in apache#16727, pivoting through one ColumnReader per FieldVector instead of through per-row GenericRow materialisation. ArrowColumnReaderFactory is the general factory: the caller supplies an already-open ArrowReader (any subclass — ArrowFileReader, ArrowStreamReader, or a custom subclass that yields in-process record batches) plus the BufferAllocator the reader was opened against. The factory does not close the reader or allocator; the caller owns their lifecycle. Per-column accumulator vectors created during init are owned by the factory and released on close. ArrowFileColumnReaderFactory is a file-specialised convenience: at init time it opens a private RootAllocator sized by the arrowAllocatorLimit config, opens the IPC file via ArrowFileReader, and closes all three on close(). Both factories share their column-build logic via the package-private ArrowAccumulators helper: walk every record batch, bulk-append each wanted column into a per-column FieldVector accumulator via Arrow's VectorAppender (Visitor-based; grows offset / data buffers once per batch and bulk-copies, rather than per-row copyValueSafe), produce one ArrowColumnReader per accumulator. The helper rejects dictionary-encoded columns loudly via Preconditions.checkArgument since the column-major path does not yet implement DictionaryEncoder.decode. The helper also exposes a closeAll utility used by both factory close() paths so the per-vector close loop lives once. ArrowColumnReader wraps a single Arrow FieldVector and supports the three documented ColumnReader access patterns: generic sequential next() with null checks, typed sequential nextInt() / nextLong() / ... with isNextNull() and skipNext(), and random access by docId. Single-value and multi-value (List of primitive) variants are covered. getValue(int docId) delegates Arrow → Pinot type conversion to the ArrowToPinotTypeConverter utility (precursor commit). The converter returns canonical JDK types — String for Utf8 / LargeUtf8 (unwrapped from Arrow's Text), Object[] for List variants (with recursive element conversion), LocalDate / LocalTime / Timestamp for temporal types, BigDecimal for Decimal, etc. — matching the contract that Pinot's stats collectors, dictionary creators, and index creators downstream of buildColumnar() expect. Unit tests cover sequential read across primitive types with nulls, random access by docId, generic next() null counting, multi-value int column with per-element nulls, column subset filtering, getAvailableColumns() reflecting the source schema, multi-batch concatenation with verification of values straddling batch boundaries, and the general factory's caller-managed path with an ArrowStreamReader over a ByteArrayInputStream (asserting caller-owned allocator remains usable after factory close). An integration test builds a Pinot segment from the same Arrow IPC file via both the row-major path (ArrowRecordReader → driver.build()) and the file column-major path (ArrowFileColumnReaderFactory → driver.build() → buildColumnar()), then asserts per-column metadata equivalence (cardinality, min, max, totalDocs, data type) and segment-level doc count.
609ca47 to
26316d8
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
TL;DR: Adds
ArrowColumnReaderFactory(caller-managed reader/allocator) andArrowFileColumnReaderFactory(file convenience) to thepinot-arrowplugin, exposing Arrow IPC sources to the column-major build path introduced in #16727. Stacked on top of #18632.Tracks #18629.
Depends on
#18632 (precursor PR) — extracts
ArrowToPinotTypeConverterfromArrowRecordExtractorinto a public shared utility. Must merge before this PR, since the newArrowColumnReader.getValue(int)delegates to that utility.The diff currently shows both commits (precursor + this work); once #18632 lands on
master, GitHub will reduce the diff to just this PR's commit.What changed
ArrowColumnReaderFactory— implementsColumnReaderFactoryover a caller-managedArrowReaderandBufferAllocator. The factory does not close them. Accepts anyArrowReadersubclass —ArrowFileReader,ArrowStreamReader, or a custom subclass that yields in-process record batches. Suitable for batch pipelines that produce Arrow batches in-process (e.g. a Spark task usingmapInArrow) and want to drivebuildColumnar()without disk I/O.ArrowFileColumnReaderFactory— file-specialised convenience that opens a privateRootAllocator(sized by thearrowAllocatorLimitconfig), opens the IPC file viaArrowFileReader, and closes all three onclose(). The natural choice for callers that just want to read an Arrow file on disk.ArrowColumnReader— wraps a single ArrowFieldVectorand implements the three documentedColumnReaderaccess patterns: generic sequentialnext()with null checks, typed sequential (nextInt()/nextLong()/ …) withisNextNull()/skipNext(), and random access bydocId. Single-value and multi-value (Listof primitive) variants supported.ArrowAccumulators— package-private helper shared by both factories. Walks every record batch and bulk-appends each wanted column into a per-columnFieldVectoraccumulator via Arrow'sVectorAppender(Visitor-based; grows buffers once per batch and bulk-copies, instead of per-rowcopyValueSafe). Rejects dictionary-encoded columns loudly viaPreconditions.checkArgumentsince the column-major path doesn't yet implementDictionaryEncoder.decode. Also exposes acloseAll(map)utility shared by both factoryclose()paths.ArrowColumnReader.getValue(int docId)delegates Arrow → Pinot type conversion to the sharedArrowToPinotTypeConverter(from #18632), returning canonical JDK types —StringfromUtf8/LargeUtf8(unwrapped from Arrow'sText),Object[]forListvariants (with recursive element conversion),LocalDate/LocalTime/Timestampfor temporal types,BigDecimalforDecimal. Matches the row-major path's contract so per-column stats collectors, dictionary creators, and index creators downstream ofbuildColumnar()see the same JDK types they see today.Compatibility
Purely additive on the SPI side:
ArrowRecordReaderpath is unchanged.ColumnReaderFactory-accepting overload ofSegmentIndexCreationDriverImpl.init.ColumnReader/ColumnReaderFactoryinterfaces from column major segment build for columnar datasource #16727.Testing
ArrowColumnReaderFactoryTest(caller-managed path viaArrowStreamReader)ArrowFileColumnReaderFactoryTest(file path)ArrowColumnarBuildIntegrationTest— segment metadata equivalence: row-major ↔ file column-major; row-major ↔ in-memory column-majorArrowRecordExtractorTest54 +ArrowMessageDecoderTest7 +ArrowRecordReaderTest2)The integration test builds a Pinot segment from the same Arrow fixture three ways:
ArrowRecordReader → driver.build()ArrowFileColumnReaderFactory → driver.build() → buildColumnar()ArrowColumnReaderFactory(ArrowStreamReader, BufferAllocator) → driver.build() → buildColumnar()(no file touched on this path)…and asserts per-column cardinality, min, max, totalDocs, and data type match, plus segment-level doc count match across each compared pair. Path 3 proves the disk-free use case works end-to-end.
Arrow caveat: type assumptions downstream of
buildColumnar()Several downstream cast sites in
pinot-segment-local— for exampleStringColumnPreIndexStatsCollector.collectandNoDictColumnStatisticsCollector.collect— cast directly toString/Integerwithout type checks, and NPE onnullentries. This holds for the existing segment-to-segment consumer ofbuildColumnar()but is fragile under any non-segment source. The shared converter satisfies the type contract today (e.g. unwraps Arrow'sTexttoString); the underlying assumptions are out of scope for this PR and warrant a follow-up to tighten them once a non-segment consumer exists in tree.References
ColumnReaderSPI: column major segment build for columnar datasource #16727ArrowRecordExtractorrefactor: Refactor Arrow extraction to follow the RecordExtractor contract #18434