diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index dcb64d7d18d0a..10f1b5e93c70d 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -78,6 +78,17 @@ DEFINE_string( "'stream' mode: Input file scan happens inside of the pipeline." "'buffered' mode: First read all data into memory and feed the pipeline with it."); DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting `spark.gluten.sql.debug`"); +DEFINE_bool(query_trace_enabled, false, "Whether to enable query trace"); +DEFINE_string(query_trace_dir, "", "Base dir of a query to store tracing data."); +DEFINE_string( + query_trace_node_ids, + "", + "A comma-separated list of plan node ids whose input data will be traced. Empty string if only want to trace the query metadata."); +DEFINE_int64(query_trace_max_bytes, 0, "The max trace bytes limit. Tracing is disabled if zero."); +DEFINE_string( + query_trace_task_reg_exp, + "", + "The regexp of traced task id. We only enable trace on a task if its id matches."); struct WriterMetrics { int64_t splitTime{0}; @@ -338,6 +349,25 @@ void updateBenchmarkMetrics( writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024); } } + +void setQueryTraceConfig(std::unordered_map& configs) { + if (!FLAGS_query_trace_enabled) { + return; + } + configs[kQueryTraceEnabled] = "true"; + if (FLAGS_query_trace_dir != "") { + configs[kQueryTraceDir] = FLAGS_query_trace_dir; + } + if (FLAGS_query_trace_max_bytes) { + configs[kQueryTraceMaxBytes] = std::to_string(FLAGS_query_trace_max_bytes); + } + if (FLAGS_query_trace_node_ids != "") { + configs[kQueryTraceNodeIds] = FLAGS_query_trace_node_ids; + } + if (FLAGS_query_trace_task_reg_exp != "") { + configs[kQueryTraceTaskRegExp] = FLAGS_query_trace_task_reg_exp; + } +} } // namespace using RuntimeFactory = std::function; @@ -455,6 +485,7 @@ auto BM_Generic = [](::benchmark::State& state, auto* rawIter = static_cast(resultIter->getInputIter()); const auto* task = rawIter->task(); + LOG(WARNING) << task->toString(); const auto* planNode = rawIter->veloxPlan(); auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true); LOG(WARNING) << statsStr; @@ -584,6 +615,8 @@ int main(int argc, char** argv) { if (sessionConf.empty()) { sessionConf = backendConf; } + setQueryTraceConfig(sessionConf); + setQueryTraceConfig(backendConf); initVeloxBackend(backendConf); memory::MemoryManager::testingSetInstance({}); diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index e01d2d89856b1..96fb16d62ebae 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -27,6 +27,7 @@ #include "utils/ConfigExtractor.h" #include "config/GlutenConfig.h" +#include "config/VeloxConfig.h" #include "operators/plannodes/RowVectorStream.h" namespace gluten { @@ -1206,6 +1207,24 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( return node; } +core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode( + const ::substrait::ReadRel& readRel, + int32_t streamIdx) { + std::vector values; + VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx); + const auto iterator = inputIters_[streamIdx]; + while (iterator->hasNext()) { + auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), iterator->next()); + values.emplace_back(cb->getRowVector()); + } + auto node = std::make_shared(nextPlanNodeId(), std::move(values)); + + auto splitInfo = std::make_shared(); + splitInfo->isStream = true; + splitInfoMap_[node->id()] = splitInfo; + return node; +} + core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) { // emit is not allowed in TableScanNode and ValuesNode related // outputs @@ -1217,7 +1236,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: // Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source. auto streamIdx = getStreamIndex(readRel); if (streamIdx >= 0) { - return constructValueStreamNode(readRel, streamIdx); + // Only used in benchmark enable query trace, replace ValueStreamNode to ValuesNode to support serialization. + if (LIKELY(confMap_[kQueryTraceEnabled] != "true")) { + return constructValueStreamNode(readRel, streamIdx); + } else { + return constructValuesNode(readRel, streamIdx); + } } // Otherwise, will create TableScan node for ReadRel. diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 6121923df7871..dbdb63fe0ba9b 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -67,7 +67,10 @@ class SubstraitToVeloxPlanConverter { const std::unordered_map& confMap = {}, const std::optional writeFilesTempPath = std::nullopt, bool validationMode = false) - : pool_(pool), confMap_(confMap), writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {} + : pool_(pool), + confMap_(confMap), + writeFilesTempPath_(writeFilesTempPath), + validationMode_(validationMode) {} /// Used to convert Substrait WriteRel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel); @@ -123,6 +126,9 @@ class SubstraitToVeloxPlanConverter { core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + // This is only used in benchmark and enable query trace, which will load all the data to ValuesNode. + core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, int32_t streamIdx); + /// Used to convert Substrait Rel into Velox PlanNode. core::PlanNodePtr toVeloxPlan(const ::substrait::Rel& sRel); diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index f18ff9244a3d4..f2f52fd7be0cd 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -2297,11 +2297,13 @@ object GlutenConfig { val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") .doc("Enable query tracing flag.") + .internal() .booleanConf .createWithDefault(false) val QUERY_TRACE_DIR = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceDir") .doc("Base dir of a query to store tracing data.") + .internal() .stringConf .createWithDefault("") @@ -2309,18 +2311,21 @@ object GlutenConfig { .doc( "A comma-separated list of plan node ids whose input data will be traced. " + "Empty string if only want to trace the query metadata.") + .internal() .stringConf .createWithDefault("") val QUERY_TRACE_MAX_BYTES = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceMaxBytes") .doc("The max trace bytes limit. Tracing is disabled if zero.") + .internal() .longConf .createWithDefault(0) val QUERY_TRACE_TASK_REG_EXP = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceTaskRegExp") .doc("The regexp of traced task id. We only enable trace on a task if its id matches.") + .internal() .stringConf .createWithDefault("") @@ -2330,6 +2335,7 @@ object GlutenConfig { "Config used to create operator trace directory. This config is provided to" + " underlying file system and the config is free form. The form should be " + "defined by the underlying file system.") + .internal() .stringConf .createWithDefault("")