Skip to content

Commit

Permalink
use benchmark to enable query trace
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Jan 14, 2025
1 parent 187a034 commit f8b94a4
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 2 deletions.
33 changes: 33 additions & 0 deletions cpp/velox/benchmarks/GenericBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,17 @@ DEFINE_string(
"'stream' mode: Input file scan happens inside of the pipeline."
"'buffered' mode: First read all data into memory and feed the pipeline with it.");
DEFINE_bool(debug_mode, false, "Whether to enable debug mode. Same as setting `spark.gluten.sql.debug`");
DEFINE_bool(query_trace_enabled, false, "Whether to enable query trace");
DEFINE_string(query_trace_dir, "", "Base dir of a query to store tracing data.");
DEFINE_string(
query_trace_node_ids,
"",
"A comma-separated list of plan node ids whose input data will be traced. Empty string if only want to trace the query metadata.");
DEFINE_int64(query_trace_max_bytes, 0, "The max trace bytes limit. Tracing is disabled if zero.");
DEFINE_string(
query_trace_task_reg_exp,
"",
"The regexp of traced task id. We only enable trace on a task if its id matches.");

struct WriterMetrics {
int64_t splitTime{0};
Expand Down Expand Up @@ -338,6 +349,25 @@ void updateBenchmarkMetrics(
writerMetrics.bytesWritten, benchmark::Counter::kAvgIterations, benchmark::Counter::OneK::kIs1024);
}
}

void setQueryTraceConfig(std::unordered_map<std::string, std::string>& configs) {
if (!FLAGS_query_trace_enabled) {
return;
}
configs[kQueryTraceEnabled] = "true";
if (FLAGS_query_trace_dir != "") {
configs[kQueryTraceDir] = FLAGS_query_trace_dir;
}
if (FLAGS_query_trace_max_bytes) {
configs[kQueryTraceMaxBytes] = std::to_string(FLAGS_query_trace_max_bytes);
}
if (FLAGS_query_trace_node_ids != "") {
configs[kQueryTraceNodeIds] = FLAGS_query_trace_node_ids;
}
if (FLAGS_query_trace_task_reg_exp != "") {
configs[kQueryTraceTaskRegExp] = FLAGS_query_trace_task_reg_exp;
}
}
} // namespace

using RuntimeFactory = std::function<VeloxRuntime*(MemoryManager* memoryManager)>;
Expand Down Expand Up @@ -455,6 +485,7 @@ auto BM_Generic = [](::benchmark::State& state,

auto* rawIter = static_cast<gluten::WholeStageResultIterator*>(resultIter->getInputIter());
const auto* task = rawIter->task();
LOG(WARNING) << task->toString();
const auto* planNode = rawIter->veloxPlan();
auto statsStr = facebook::velox::exec::printPlanWithStats(*planNode, task->taskStats(), true);
LOG(WARNING) << statsStr;
Expand Down Expand Up @@ -584,6 +615,8 @@ int main(int argc, char** argv) {
if (sessionConf.empty()) {
sessionConf = backendConf;
}
setQueryTraceConfig(sessionConf);
setQueryTraceConfig(backendConf);

initVeloxBackend(backendConf);
memory::MemoryManager::testingSetInstance({});
Expand Down
26 changes: 25 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "utils/ConfigExtractor.h"

#include "config/GlutenConfig.h"
#include "config/VeloxConfig.h"
#include "operators/plannodes/RowVectorStream.h"

namespace gluten {
Expand Down Expand Up @@ -1206,6 +1207,24 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
return node;
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValuesNode(
const ::substrait::ReadRel& readRel,
int32_t streamIdx) {
std::vector<RowVectorPtr> values;
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
const auto iterator = inputIters_[streamIdx];
while (iterator->hasNext()) {
auto cb = VeloxColumnarBatch::from(defaultLeafVeloxMemoryPool().get(), iterator->next());
values.emplace_back(cb->getRowVector());
}
auto node = std::make_shared<facebook::velox::core::ValuesNode>(nextPlanNodeId(), std::move(values));

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
splitInfoMap_[node->id()] = splitInfo;
return node;
}

core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::ReadRel& readRel) {
// emit is not allowed in TableScanNode and ValuesNode related
// outputs
Expand All @@ -1217,7 +1236,12 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
// Check if the ReadRel specifies an input of stream. If yes, build ValueStreamNode as the data source.
auto streamIdx = getStreamIndex(readRel);
if (streamIdx >= 0) {
return constructValueStreamNode(readRel, streamIdx);
// Only used in benchmark enable query trace, replace ValueStreamNode to ValuesNode to support serialization.
if (LIKELY(confMap_[kQueryTraceEnabled] != "true")) {
return constructValueStreamNode(readRel, streamIdx);
} else {
return constructValuesNode(readRel, streamIdx);
}
}

// Otherwise, will create TableScan node for ReadRel.
Expand Down
8 changes: 7 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class SubstraitToVeloxPlanConverter {
const std::unordered_map<std::string, std::string>& confMap = {},
const std::optional<std::string> writeFilesTempPath = std::nullopt,
bool validationMode = false)
: pool_(pool), confMap_(confMap), writeFilesTempPath_(writeFilesTempPath), validationMode_(validationMode) {}
: pool_(pool),
confMap_(confMap),
writeFilesTempPath_(writeFilesTempPath),
validationMode_(validationMode) {}

/// Used to convert Substrait WriteRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::WriteRel& writeRel);
Expand Down Expand Up @@ -123,6 +126,9 @@ class SubstraitToVeloxPlanConverter {

core::PlanNodePtr constructValueStreamNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);

// This is only used in benchmark and enable query trace, which will load all the data to ValuesNode.
core::PlanNodePtr constructValuesNode(const ::substrait::ReadRel& sRead, int32_t streamIdx);

/// Used to convert Substrait Rel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::Rel& sRel);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2297,30 +2297,35 @@ object GlutenConfig {

val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled")
.doc("Enable query tracing flag.")
.internal()
.booleanConf
.createWithDefault(false)

val QUERY_TRACE_DIR = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceDir")
.doc("Base dir of a query to store tracing data.")
.internal()
.stringConf
.createWithDefault("")

val QUERY_TRACE_NODE_IDS = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceNodeIds")
.doc(
"A comma-separated list of plan node ids whose input data will be traced. " +
"Empty string if only want to trace the query metadata.")
.internal()
.stringConf
.createWithDefault("")

val QUERY_TRACE_MAX_BYTES =
buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceMaxBytes")
.doc("The max trace bytes limit. Tracing is disabled if zero.")
.internal()
.longConf
.createWithDefault(0)

val QUERY_TRACE_TASK_REG_EXP =
buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceTaskRegExp")
.doc("The regexp of traced task id. We only enable trace on a task if its id matches.")
.internal()
.stringConf
.createWithDefault("")

Expand All @@ -2330,6 +2335,7 @@ object GlutenConfig {
"Config used to create operator trace directory. This config is provided to" +
" underlying file system and the config is free form. The form should be " +
"defined by the underlying file system.")
.internal()
.stringConf
.createWithDefault("")

Expand Down

0 comments on commit f8b94a4

Please sign in to comment.