feat(lance): Implement canWrite() in HoodieSparkLanceWriter with configurable max file size for Lance#18341
feat(lance): Implement canWrite() in HoodieSparkLanceWriter with configurable max file size for Lance#18341wombatu-kun wants to merge 2 commits intoapache:masterfrom
Conversation
|
Ack, will review tomorrow! |
|
Thanks @wombatu-kun for the help! @voonhous can you review this if you get a chance, since gonna be ooto. Once back will review |
be23647 to
57c518e
Compare
…igurable max file size for Lance
| HoodieStorage storage, | ||
| boolean populateMetaFields, | ||
| Option<BloomFilter> bloomFilterOpt) { | ||
| this(file, sparkSchema, instantTime, taskContextSupplier, storage, populateMetaFields, bloomFilterOpt, Long.MAX_VALUE); |
There was a problem hiding this comment.
Shouldn't we have some reasonable default here for a maxFileSize rather than Long.MAX Value?
There was a problem hiding this comment.
ok, use LANCE_MAX_FILE_SIZE.defaultValue() instead of Long.MAX_VALUE
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18341 +/- ##
=============================================
- Coverage 68.41% 54.05% -14.37%
+ Complexity 27408 12139 -15269
=============================================
Files 2423 1421 -1002
Lines 132458 69952 -62506
Branches 15972 7795 -8177
=============================================
- Hits 90623 37812 -52811
+ Misses 34784 28781 -6003
+ Partials 7051 3359 -3692
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
Closes #17684
Summary and Changelog
Implement canWrite() in HoodieSparkLanceWriter analogously to HoodieBaseParquetWriter.canWrite() by tracking cumulative Arrow buffer sizes in the base class and adding periodic size-limit checks in the Spark writer.
HoodieStorageConfig: AddedLANCE_MAX_FILE_SIZEconfig property (keyhoodie.lance.max.file.size, default 120 MB) and alanceMaxFileSize(long)builder method, consistent with the existing Parquet/ORC/HFile config entries.HoodieBaseLanceWriter: AddedtotalFlushedDataSizefield, andgetDataSize()accessor. InflushBatch(), afterarrowWriter.finishBatch()sets the row count, the method now iterates overroot.getFieldVectors()and accumulatesvector.getBufferSize()intototalFlushedDataSizebefore writing to Lance. This provides an uncompressed Arrow buffer size estimate analogous toParquetWriter.getDataSize().HoodieSparkLanceWriter:MIN_RECORDS_FOR_SIZE_CHECK= 100 andMAX_RECORDS_FOR_SIZE_CHECK= 10000 constants (mirrors the Parquet constants).maxFileSizeandrecordCountForNextSizeCheckfields.maxFileSize; the no-arg secondary constructor now delegates withLong.MAX_VALUE(no limit); a new secondary constructor accepting explicitmaxFileSizeis added for use byHoodieInternalRowFileWriterFactory.canWrite()implementation: checks periodically based onrecordCountForNextSizeCheck, computes average record size fromgetDataSize()/writtenCount, returns false when within two average records ofmaxFileSize, and adaptively schedules the next check.HoodieSparkFileWriterFactory: ReadsLANCE_MAX_FILE_SIZEfrom config and passes it to theHoodieSparkLanceWriterconstructor.HoodieInternalRowFileWriterFactory: methodgetInternalRowFileWriterreadsLANCE_MAX_FILE_SIZEand passes it (throughnewLanceInternalRowFileWriter) to the newHoodieSparkLanceWriterconstructor.Impact
track a proper implementation that checks to see if the file has reached some threshold in size and if so should roll over the write to a new file
Risk Level
none
Documentation Update
Need to add
LANCE_MAX_FILE_SIZEconfig property (hoodie.lance.max.file.size, default 120 MB)Contributor's checklist