diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index dcb64d7d18d0..244e0b885a3e 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -78,6 +78,17 @@ DEFINE_string( "'stream' mode: Input file scan happens inside of the pipeline." "'buffered' mode: First read all data into memory and feed the pipeline with it."); DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting `spark.gluten.sql.debug`"); +DEFINE_bool(query_trace_enabled, false, "Whether to enable query trace."); +DEFINE_string(query_trace_dir, "", "Base dir of a query to store tracing data."); +DEFINE_string( + query_trace_node_ids, + "", + "A comma-separated list of plan node ids whose input data will be traced. Empty string if only want to trace the query metadata."); +DEFINE_int64(query_trace_max_bytes, 0, "The max trace bytes limit. Tracing is disabled if zero."); +DEFINE_string( + query_trace_task_reg_exp, + "", + "The regexp of traced task id. We only enable trace on a task if its id matches."); struct WriterMetrics { int64_t splitTime{0}; @@ -338,6 +349,25 @@ void updateBenchmarkMetrics( writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024); } } + +void setQueryTraceConfig(std::unordered_map& configs) { + if (!FLAGS_query_trace_enabled) { + return; + } + configs[kQueryTraceEnabled] = "true"; + if (FLAGS_query_trace_dir != "") { + configs[kQueryTraceDir] = FLAGS_query_trace_dir; + } + if (FLAGS_query_trace_max_bytes) { + configs[kQueryTraceMaxBytes] = std::to_string(FLAGS_query_trace_max_bytes); + } + if (FLAGS_query_trace_node_ids != "") { + configs[kQueryTraceNodeIds] = FLAGS_query_trace_node_ids; + } + if (FLAGS_query_trace_task_reg_exp != "") { + configs[kQueryTraceTaskRegExp] = FLAGS_query_trace_task_reg_exp; + } +} } // namespace using RuntimeFactory = std::function; @@ -455,6 +485,7 @@ auto BM_Generic = [](::benchmark::State& state, auto* rawIter = static_cast(resultIter->getInputIter()); const auto* task = rawIter->task(); + LOG(WARNING) << task->toString(); const auto* planNode = rawIter->veloxPlan(); auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); LOG(WARNING) << statsStr; @@ -584,6 +615,8 @@ int main(int argc, char** argv) { if (sessionConf.empty()) { sessionConf = backendConf; } + setQueryTraceConfig(sessionConf); + setQueryTraceConfig(backendConf); initVeloxBackend(backendConf); memory::MemoryManager::testingSetInstance({}); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index c1ae6f218458..20f54856649b 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -178,7 +178,7 @@ WholeStageResultIterator::WholeStageResultIterator( std::shared_ptr WholeStageResultIterator::createNewVeloxQueryCtx() { std::unordered_map> connectorConfigs; connectorConfigs[kHiveConnectorId] = createConnectorConfig(); - + static std::atomic vqId{0}; // Velox query ID, same with taskId. std::shared_ptr ctx = velox::core::QueryCtx::create( nullptr, facebook::velox::core::QueryConfig{getQueryContextConf()}, @@ -186,7 +186,11 @@ std::shared_ptr WholeStageResultIterator::createNewVeloxQ gluten::VeloxBackend::get()->getAsyncDataCache(), memoryManager_->getAggregateMemoryPool(), spillExecutor_.get(), - ""); + fmt::format( + "Gluten_Stage_{}_TID_{}_VTID_{}", + std::to_string(taskInfo_.stageId), + std::to_string(taskInfo_.taskId), + std::to_string(vqId++))); return ctx; } @@ -558,6 +562,18 @@ std::unordered_map WholeStageResultIterator::getQueryC configs[velox::core::QueryConfig::kSparkLegacyDateFormatter] = "false"; } + const auto setIfExists = [&](const std::string& glutenKey, const std::string& veloxKey) { + const auto valueOptional = veloxCfg_->get(glutenKey); + if (valueOptional.hasValue()) { + configs[veloxKey] = valueOptional.value(); + } + }; + setIfExists(kQueryTraceEnabled, velox::core::QueryConfig::kQueryTraceEnabled); + setIfExists(kQueryTraceDir, velox::core::QueryConfig::kQueryTraceDir); + setIfExists(kQueryTraceNodeIds, velox::core::QueryConfig::kQueryTraceNodeIds); + setIfExists(kQueryTraceMaxBytes, velox::core::QueryConfig::kQueryTraceMaxBytes); + setIfExists(kQueryTraceTaskRegExp, velox::core::QueryConfig::kQueryTraceTaskRegExp); + setIfExists(kOpTraceDirectoryCreateConfig, velox::core::QueryConfig::kOpTraceDirectoryCreateConfig); } catch (const std::invalid_argument& err) { std::string errDetails = err.what(); throw std::runtime_error("Invalid conf arg: " + errDetails); diff --git a/cpp/velox/config/VeloxConfig.h b/cpp/velox/config/VeloxConfig.h index f882e72065b2..84493cc46919 100644 --- a/cpp/velox/config/VeloxConfig.h +++ b/cpp/velox/config/VeloxConfig.h @@ -135,4 +135,23 @@ const uint32_t kGlogVerboseLevelDefault = 0; const uint32_t kGlogVerboseLevelMaximum = 99; const std::string kGlogSeverityLevel = "spark.gluten.sql.columnar.backend.velox.glogSeverityLevel"; const uint32_t kGlogSeverityLevelDefault = 1; + +// Query trace +/// Enable query tracing flag. +const std::string kQueryTraceEnabled = "spark.gluten.sql.columnar.backend.velox.queryTraceEnabled"; +/// Base dir of a query to store tracing data. +const std::string kQueryTraceDir = "spark.gluten.sql.columnar.backend.velox.queryTraceDir"; +/// A comma-separated list of plan node ids whose input data will be traced. +/// Empty string if only want to trace the query metadata. +const std::string kQueryTraceNodeIds = "spark.gluten.sql.columnar.backend.velox.queryTraceNodeIds"; +/// The max trace bytes limit. Tracing is disabled if zero. +const std::string kQueryTraceMaxBytes = "spark.gluten.sql.columnar.backend.velox.queryTraceMaxBytes"; +/// The regexp of traced task id. We only enable trace on a task if its id +/// matches. +const std::string kQueryTraceTaskRegExp = "spark.gluten.sql.columnar.backend.velox.queryTraceTaskRegExp"; +/// Config used to create operator trace directory. This config is provided to +/// underlying file system and the config is free form. The form should be +/// defined by the underlying file system. +const std::string kOpTraceDirectoryCreateConfig = + "spark.gluten.sql.columnar.backend.velox.opTraceDirectoryCreateConfig"; } // namespace gluten diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e01d2d89856b..96fb16d62eba 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -27,6 +27,7 @@ #include "utils/ConfigExtractor.h" #include "config/GlutenConfig.h" +#include "config/VeloxConfig.h" #include "operators/plannodes/RowVectorStream.h" namespace gluten { @@ -1206,6 +1207,24 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( return node; } +core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode( + const ::substrait::ReadRel& readRel, + int32_t streamIdx) { + std::vector values; + VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx); + const auto iterator = inputIters_[streamIdx]; + while (iterator->hasNext()) { + auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), iterator->next()); + values.emplace_back(cb->getRowVector()); + } + auto node = std::make_shared(nextPlanNodeId(), std::move(values)); + + auto splitInfo = std::make_shared(); + splitInfo->isStream = true; + splitInfoMap_[node->id()] = splitInfo; + return node; +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) { // emit is not allowed in TableScanNode and ValuesNode related // outputs @@ -1217,7 +1236,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source. auto streamIdx = getStreamIndex(readRel); if (streamIdx >= 0) { - return constructValueStreamNode(readRel, streamIdx); + // Only used in benchmark enable query trace, replace ValueStreamNode to ValuesNode to support serialization. + if (LIKELY(confMap_[kQueryTraceEnabled] != "true")) { + return constructValueStreamNode(readRel, streamIdx); + } else { + return constructValuesNode(readRel, streamIdx); + } } // Otherwise, will create TableScan node for ReadRel. diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 6121923df787..9636f6615f96 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -123,6 +123,9 @@ class SubstraitToVeloxPlanConverter { core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + // This is only used in benchmark and enable query trace, which will load all the data to ValuesNode. + core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + /// Used to convert Substrait Rel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::Rel& sRel); diff --git a/docs/developers/QueryTrace.md b/docs/developers/QueryTrace.md new file mode 100644 index 000000000000..71c66215110e --- /dev/null +++ b/docs/developers/QueryTrace.md @@ -0,0 +1,174 @@ +--- +layout: page +title: QueryTrace +nav_order: 14 +parent: Developer Overview +--- + +# Background +Currently, we have [MicroBenchmarks](https://github.com/apache/incubator-gluten/blob/main/docs/developers/MicroBenchmarks.md) to profile the Velox plan execution in stage level, now we have query trace replayer to specify plan node id to profile in operator level. + +We cannot profile operator level directly because ValueStreamNode cannot serialize and deserialize, so we should generate benchmark first, and then enable query trace in benchmark which replaces the ValueStreamNode to ValuesNode. + +Relevant link: +https://facebookincubator.github.io/velox/develop/debugging/tracing.html + +https://facebookincubator.github.io/velox/configs.html#tracing + +https://velox-lib.io/blog/velox-query-tracing + +# Usage +Firstly, we should use MicroBenchmark to generate the input parquet data file and substrait json plan. + +```shell +/tmp/saveDir +├── conf_35_0.ini +├── data_35_0_0.parquet +├── plan_35_0.json +└── split_35_0_0.json +``` +Secondly, execute the benchmark to get the task id and query trace node id, for example, the task id is `Gluten_Stage_0_TID_0_VTID_0` and we will profile partial aggregation if node id `7`. + +```shell +W0113 15:30:18.903225 2524110 GenericBenchmark.cc:251] Setting CPU for thread 0 to 0 +I0113 15:30:19.093111 2524110 Task.cpp:2051] Terminating task Gluten_Stage_0_TID_0_VTID_0 with state Finished after running for 142ms +W0113 15:30:19.093384 2524110 GenericBenchmark.cc:485] {Task Gluten_Stage_0_TID_0_VTID_0 (Gluten_Stage_0_TID_0_VTID_0) Finished +Plan: +-- Project[8][expressions: (n8_3:INTEGER, hash_with_seed(42,"n6_5","n6_6")), (n8_4:DOUBLE, "n6_5"), (n8_5:DOUBLE, "n6_6"), (n8_6:BIGINT, "n7_2")] -> n8_3:INTEGER, n8_4:DOUBLE, n8_5:DOUBLE, n8_6:BIGINT + -- Aggregation[7][PARTIAL [n6_5, n6_6] n7_2 := sum_partial("n6_7")] -> n6_5:DOUBLE, n6_6:DOUBLE, n7_2:BIGINT + -- Project[6][expressions: (n6_5:DOUBLE, "n5_7"), (n6_6:DOUBLE, "n5_8"), (n6_7:BIGINT, unscaled_value("n5_9"))] -> n6_5:DOUBLE, n6_6:DOUBLE, n6_7:BIGINT + -- Project[5][expressions: (n5_7:DOUBLE, "n1_4"), (n5_8:DOUBLE, "n1_5"), (n5_9:DECIMAL(7, 2), "n1_6"), (n5_10:DOUBLE, "n1_7"), (n5_11:DOUBLE, "n3_1")] -> n5_7:DOUBLE, n5_8:DOUBLE, n5_9:DECIMAL(7, 2), n5_10:DOUBLE, n5_11:DOUBLE + -- HashJoin[4][INNER n1_8=n3_2] -> n1_4:DOUBLE, n1_5:DOUBLE, n1_6:DECIMAL(7, 2), n1_7:DOUBLE, n1_8:DOUBLE, n3_1:DOUBLE, n3_2:DOUBLE + -- Project[1][expressions: (n1_4:DOUBLE, "n0_0"), (n1_5:DOUBLE, "n0_1"), (n1_6:DECIMAL(7, 2), "n0_2"), (n1_7:DOUBLE, "n0_3"), (n1_8:DOUBLE, "n0_3")] -> n1_4:DOUBLE, n1_5:DOUBLE, n1_6:DECIMAL(7, 2), n1_7:DOUBLE, n1_8:DOUBLE + -- TableScan[0][table: hive_table, remaining filter: (isnotnull("sr_store_sk"))] -> n0_0:DOUBLE, n0_1:DOUBLE, n0_2:DECIMAL(7, 2), n0_3:DOUBLE + -- Project[3][expressions: (n3_1:DOUBLE, "n2_0"), (n3_2:DOUBLE, "n2_0")] -> n3_1:DOUBLE, n3_2:DOUBLE + -- ValueStream[2][] -> n2_0:DOUBLE +``` + +Thirdly, execute the benchmark and enable the query trace. +Note: you need to specify a unique query id for this trace process, then the trace data can be generated to the correct directory layer. + +```shell + ./generic_benchmark --conf /tmp/saveDir/conf_35_0.ini --plan /tmp/saveDir/plan_35_0.json \ --split /tmp/saveDir/split_35_0_0.json --data /tmp/saveDir/data_35_0_0.parquet --query_trace_enabled=true --query_trace_dir=/tmp/query_trace --query_trace_node_ids=7 --query_trace_max_bytes=100000000000 --query_trace_task_reg_exp=Gluten_Stage_0_TID_0_VTID_0 --query_trace_query_id=query_1 --threads 1 --noprint-result +``` + +We can see the following message, which indicates the query trace runs successfully. +```shell +I0114 11:06:52.461263 2587302 Task.cpp:3072] Trace input for plan nodes 7 from task Gluten_Stage_0_TID_0_VTID_0 +I0114 11:06:52.462595 2587302 Operator.cpp:135] Trace input for operator type: PartialAggregation, operator id: 5, pipeline: 0, driver: 0, task: Gluten_Stage_0_TID_0_VTID_0 +``` + +Now we can see the data in query trace directory `/tmp/query_trace`. + +```shell +/tmp/query_trace/ +└── Gluten_Stage_0_TID_0_VTID_0 + └── Gluten_Stage_0_TID_0_VTID_0 + ├── 7 + │   └── 0 + │   └── 0 + │   ├── op_input_trace.data + │   └── op_trace_summary.json + └── task_trace_meta.json +``` +Fourthly, replay the query. Show the query trace summary by following command. + +```shell + +/mnt/DP_disk1/code/velox/build/velox/tool/trace# ./velox_query_replayer --root_dir /tmp/query_trace --task_id Gluten_Stage_0_TID_0_VTID_0 --query_id=Gluten_Stage_0_TID_0_VTID_0 --node_id=7 --summary +WARNING: Logging before InitGoogleLogging() is written to STDERR +I0115 20:27:25.821105 2684048 HiveConnector.cpp:56] Hive connector test-hive created with maximum of 20000 cached file handles. +I0115 20:27:25.823112 2684048 TraceReplayRunner.cpp:223] +++++++Query trace summary++++++ +Number of tasks: 1 + +++++++Query configs++++++ + max_spill_bytes: 107374182400 + max_output_batch_rows: 4096 + max_partial_aggregation_memory: 80530636 + max_extended_partial_aggregation_memory: 120795955 + max_spill_level: 4 + spill_enabled: true + spark.bloom_filter.max_num_bits: 4194304 + spillable_reservation_growth_pct: 25 + query_trace_dir: /tmp/query_trace + spiller_num_partition_bits: 3 + preferred_output_batch_rows: 4096 + spill_write_buffer_size: 1048576 + spiller_start_partition_bit: 29 + spill_prefixsort_enabled: false + abandon_partial_aggregation_min_rows: 100000 + adjust_timestamp_to_session_timezone: true + query_trace_max_bytes: 100000000000 + query_trace_enabled: true + spark.legacy_date_formatter: false + join_spill_enabled: 1 + query_trace_task_reg_exp: Gluten_Stage_0_TID_0_VTID_0 + query_trace_node_ids: 7 + order_by_spill_enabled: 1 + aggregation_spill_enabled: 1 + abandon_partial_aggregation_min_pct: 90 + session_timezone: Etc/UTC + driver_cpu_time_slice_limit_ms: 0 + max_spill_file_size: 1073741824 + max_spill_run_rows: 3145728 + spark.bloom_filter.expected_num_items: 1000000 + spill_read_buffer_size: 1048576 + spill_compression_codec: lz4 + spark.partition_id: 0 + max_split_preload_per_driver: 2 + spark.bloom_filter.num_bits: 8388608 + +++++++Connector configs++++++ +test-hive + file_column_names_read_as_lower_case: true + partition_path_as_lower_case: false + hive.reader.timestamp_unit: 6 + hive.parquet.writer.timestamp_unit: 6 + max_partitions_per_writers: 10000 + ignore_missing_files: 0 + +++++++Task query plan++++++ +-- Project[8][expressions: (n8_3:INTEGER, hash_with_seed(42,"n6_5","n6_6")), (n8_4:DOUBLE, "n6_5"), (n8_5:DOUBLE, "n6_6"), (n8_6:BIGINT, "n7_2")] -> n8_3:INTEGER, n8_4:DOUBLE, n8_5:DOUBLE, n8_6:BIGINT + -- Aggregation[7][PARTIAL [n6_5, n6_6] n7_2 := sum_partial("n6_7")] -> n6_5:DOUBLE, n6_6:DOUBLE, n7_2:BIGINT + -- Project[6][expressions: (n6_5:DOUBLE, "n5_7"), (n6_6:DOUBLE, "n5_8"), (n6_7:BIGINT, unscaled_value("n5_9"))] -> n6_5:DOUBLE, n6_6:DOUBLE, n6_7:BIGINT + -- Project[5][expressions: (n5_7:DOUBLE, "n1_4"), (n5_8:DOUBLE, "n1_5"), (n5_9:DECIMAL(7, 2), "n1_6"), (n5_10:DOUBLE, "n1_7"), (n5_11:DOUBLE, "n3_1")] -> n5_7:DOUBLE, n5_8:DOUBLE, n5_9:DECIMAL(7, 2), n5_10:DOUBLE, n5_11:DOUBLE + -- HashJoin[4][INNER n1_8=n3_2] -> n1_4:DOUBLE, n1_5:DOUBLE, n1_6:DECIMAL(7, 2), n1_7:DOUBLE, n1_8:DOUBLE, n3_1:DOUBLE, n3_2:DOUBLE + -- Project[1][expressions: (n1_4:DOUBLE, "n0_0"), (n1_5:DOUBLE, "n0_1"), (n1_6:DECIMAL(7, 2), "n0_2"), (n1_7:DOUBLE, "n0_3"), (n1_8:DOUBLE, "n0_3")] -> n1_4:DOUBLE, n1_5:DOUBLE, n1_6:DECIMAL(7, 2), n1_7:DOUBLE, n1_8:DOUBLE + -- TableScan[0][table: hive_table, remaining filter: (isnotnull("sr_store_sk"))] -> n0_0:DOUBLE, n0_1:DOUBLE, n0_2:DECIMAL(7, 2), n0_3:DOUBLE + -- Project[3][expressions: (n3_1:DOUBLE, "d_date_sk"), (n3_2:DOUBLE, "d_date_sk")] -> n3_1:DOUBLE, n3_2:DOUBLE + -- Values[2][366 rows in 1 vectors] -> d_date_sk:DOUBLE + +++++++Task Summaries++++++ + +++++++Task Gluten_Stage_0_TID_0_VTID_0++++++ + +++++++Pipeline 0++++++ +driver 0: opType PartialAggregation, inputRows 293762, inputBytes 5.69MB, rawInputRows 0, rawInputBytes 0B, peakMemory 5.39MB +``` +Then you can use following command to re-execute the query plan. + +```shell +/mnt/DP_disk1/code/velox/build/velox/tool/trace# ./velox_query_replayer --root_dir /tmp/query_trace --task_id Gluten_Stage_0_TID_0_VTID_0 --query_id=Gluten_Stage_0_TID_0_VTID_0 --node_id=7 +WARNING: Logging before InitGoogleLogging() is written to STDERR +I0115 20:30:17.665169 2685397 HiveConnector.cpp:56] Hive connector test-hive created with maximum of 20000 cached file handles. +I0115 20:30:17.676046 2685397 Cursor.cpp:192] Task spill directory[/tmp/velox_test_H163pi/test_cursor 1] created +I0115 20:30:17.941792 2685398 Task.cpp:2051] Terminating task test_cursor 1 with state Finished after running for 265ms +I0115 20:30:17.943285 2685397 OperatorReplayerBase.cpp:146] Stats of replaying operator PartialAggregation : Output: 292979 rows (9.27MB, 81 batches), Cpu time: 107.16ms, Wall time: 107.36ms, Blocked wall time: 0ns, Peak memory: 5.39MB, Memory allocations: 19, Threads: 1, CPU breakdown: B/I/O/F (252.36us/7.32ms/99.53ms/52.96us) +I0115 20:30:17.943373 2685397 OperatorReplayerBase.cpp:149] Memory usage: Aggregation_replayer usage 0B reserved 0B peak 6.00MB + task.test_cursor 1 usage 0B reserved 0B peak 6.00MB + node.N/A usage 0B reserved 0B peak 0B + op.N/A.0.0.CallbackSink usage 0B reserved 0B peak 0B + node.1 usage 0B reserved 0B peak 6.00MB + op.1.0.0.PartialAggregation usage 0B reserved 0B peak 5.39MB + node.0 usage 0B reserved 0B peak 0B + op.0.0.0.OperatorTraceScan usage 0B reserved 0B peak 0B +I0115 20:30:17.943809 2685397 TempDirectoryPath.cpp:29] TempDirectoryPath:: removing all files from /tmp/velox_test_H163pi +``` + +Here is the full list of query trace flags in MicroBenchmark. +- query_trace_enabled: Whether to enable query trace. +- query_trace_dir: Base dir of a query to store tracing data. +- query_trace_node_ids: A comma-separated list of plan node ids whose input data will be traced. Empty string if only want to trace the query metadata. +- query_trace_max_bytes: The max trace bytes limit. Tracing is disabled if zero. +- query_trace_task_reg_exp: The regexp of traced task id. We only enable trace on a task if its id matches. diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 94a95ae515a3..7de5f924200c 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -623,7 +623,13 @@ object GlutenConfig { SPARK_GCS_STORAGE_ROOT_URL, SPARK_GCS_AUTH_TYPE, SPARK_GCS_AUTH_SERVICE_ACCOUNT_JSON_KEYFILE, - SPARK_REDACTION_REGEX + SPARK_REDACTION_REGEX, + QUERY_TRACE_ENABLED, + QUERY_TRACE_DIR, + QUERY_TRACE_NODE_IDS, + QUERY_TRACE_MAX_BYTES, + QUERY_TRACE_TASK_REG_EXP, + OP_TRACE_DIRECTORY_CREATE_CONFIG ) nativeConfMap.putAll(conf.filter(e => keys.contains(e._1)).asJava) @@ -2224,4 +2230,48 @@ object GlutenConfig { .doc("If enabled, gluten will not offload scan when encrypted parquet files are detected") .booleanConf .createWithDefault(false) + + val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") + .doc("Enable query tracing flag.") + .internal() + .booleanConf + .createWithDefault(false) + + val QUERY_TRACE_DIR = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceDir") + .doc("Base dir of a query to store tracing data.") + .internal() + .stringConf + .createWithDefault("") + + val QUERY_TRACE_NODE_IDS = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceNodeIds") + .doc("A comma-separated list of plan node ids whose input data will be traced. " + + "Empty string if only want to trace the query metadata.") + .internal() + .stringConf + .createWithDefault("") + + val QUERY_TRACE_MAX_BYTES = + buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceMaxBytes") + .doc("The max trace bytes limit. Tracing is disabled if zero.") + .internal() + .longConf + .createWithDefault(0) + + val QUERY_TRACE_TASK_REG_EXP = + buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceTaskRegExp") + .doc("The regexp of traced task id. We only enable trace on a task if its id matches.") + .internal() + .stringConf + .createWithDefault("") + + val OP_TRACE_DIRECTORY_CREATE_CONFIG = + buildConf("spark.gluten.sql.columnar.backend.velox.opTraceDirectoryCreateConfig") + .doc( + "Config used to create operator trace directory. This config is provided to" + + " underlying file system and the config is free form. The form should be " + + "defined by the underlying file system.") + .internal() + .stringConf + .createWithDefault("") + }