diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 633979613d85..8f70f4034f3f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -755,4 +755,16 @@ object VeloxConfig extends ConfigRegistry { .doc("Maps table field names to file field names using names, not indices for Parquet files.") .booleanConf .createWithDefault(true) + + val PARQUET_PAGE_SIZE_BYTES = + buildConf("spark.gluten.sql.columnar.backend.velox.parquet.row-group-size-bytes") + .doc("Page size in bytes for Parquet write operations.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("1MB") + + val VELOX_TARGET_FILE_SIZE = + buildConf("spark.gluten.sql.columnar.backend.velox.target-file-size-byte") + .doc("Target file size in bytes for write operations.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("0") } diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index 54db73303184..bda977b55d09 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -166,10 +166,14 @@ const std::string kMemoryPoolCapacityTransferAcrossTasks = const std::string kOrcUseColumnNames = "spark.gluten.sql.columnar.backend.velox.orcUseColumnNames"; const std::string kParquetUseColumnNames = "spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames"; -// write fies +// write files const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession"; const std::string kMaxTargetFileSize = "spark.gluten.sql.columnar.backend.velox.maxTargetFileSize"; +// Iceberg write configs +const std::string kWriteTargetFileSizeBytes = "spark.gluten.sql.columnar.backend.velox.target-file-size-bytes"; +const std::string kWriteParquetPageSizeBytes = "spark.gluten.sql.columnar.backend.velox.parquet.page-size-bytes"; + const std::string kGlogVerboseLevel = "spark.gluten.sql.columnar.backend.velox.glogVerboseLevel"; const uint32_t kGlogVerboseLevelDefault = 0; const uint32_t kGlogVerboseLevelMaximum = 99; diff --git a/cpp/velox/utils/ConfigExtractor.cc b/cpp/velox/utils/ConfigExtractor.cc index 13781a21c811..609f2fea60ac 100644 --- a/cpp/velox/utils/ConfigExtractor.cc +++ b/cpp/velox/utils/ConfigExtractor.cc @@ -242,6 +242,11 @@ std::shared_ptr createHiveConnectorSessionC configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] = conf->get(kOrcUseColumnNames, true) ? "true" : "false"; + configs[facebook::velox::connector::hive::HiveConfig::kMaxTargetFileSize] = + conf->get(kWriteTargetFileSizeBytes, "0"); + configs[facebook::velox::parquet::WriterOptions::kParquetSessionWritePageSize] = + conf->get(kWriteParquetPageSizeBytes, "1MB"); + overwriteVeloxConf(conf.get(), configs, kDynamicBackendConfPrefix); return std::make_shared(std::move(configs)); } diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index 859c6356c17d..787ccc7afb50 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -47,13 +47,14 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.maxSpillFileSize | 1GB | The maximum size of a single spill file created | | spark.gluten.sql.columnar.backend.velox.maxSpillLevel | 4 | The max allowed spilling level with zero being the initial spilling level | | spark.gluten.sql.columnar.backend.velox.maxSpillRunRows | 3M | The maximum row size of a single spill run | -| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | +| spark.gluten.sql.columnar.backend.velox.maxTargetFileSize | 0b | The target file size for each output file when writing data. 0 means no limit on target file size, and the actual file size will be determined by other factors such as max partition number and shuffle batch size. | | spark.gluten.sql.columnar.backend.velox.memCacheSize | 1GB | The memory cache size | | spark.gluten.sql.columnar.backend.velox.memInitCapacity | 8MB | The initial memory capacity to reserve for a newly created Velox query memory pool. | | spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks | true | Whether to allow memory capacity transfer between memory pools from different tasks. | | spark.gluten.sql.columnar.backend.velox.memoryUseHugePages | false | Use explicit huge pages for Velox memory allocation. | | spark.gluten.sql.columnar.backend.velox.orc.scan.enabled | true | Enable velox orc scan. If disabled, vanilla spark orc scan will be used. | | spark.gluten.sql.columnar.backend.velox.orcUseColumnNames | true | Maps table field names to file field names using names, not indices for ORC files. | +| spark.gluten.sql.columnar.backend.velox.parquet.row-group-size-bytes | 1MB | Page size in bytes for Parquet write operations. | | spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames | true | Maps table field names to file field names using names, not indices for Parquet files. | | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | | spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | @@ -74,6 +75,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.ssdChecksumReadVerificationEnabled | false | If true, checksum read verification from SSD is enabled. | | spark.gluten.sql.columnar.backend.velox.ssdDisableFileCow | false | True if copy on write should be disabled. | | spark.gluten.sql.columnar.backend.velox.ssdODirect | false | The O_DIRECT flag for cache writing | +| spark.gluten.sql.columnar.backend.velox.target-file-size-byte | 0 | Target file size in bytes for write operations. | | spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled | false | Whether to apply dynamic filters pushed down from hash probe in the ValueStream (shuffle reader) operator to filter rows before they reach the hash join. | | spark.gluten.sql.enable.enhancedFeatures | true | Enable some features including iceberg native write and other features. | | spark.gluten.sql.rewrite.castArrayToString | true | When true, rewrite `cast(array as String)` to `concat('[', array_join(array, ', ', null), ']')` to allow offloading to Velox. |