Skip to content

Commit

Permalink
[VL] Pass partition id to velox functions (#4344)
Browse files Browse the repository at this point in the history
[VL] Pass partition id to velox functions.
  • Loading branch information
zhli1142015 authored Mar 11, 2024
1 parent 3ad58ce commit e78ee43
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
pipelineTime: SQLMetric,
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch] = {

Expand Down Expand Up @@ -246,6 +247,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
rootNode: PlanNode,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {
// scalastyle:on argcount
GlutenConfig.getConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = {
assert(
inputPartition.isInstanceOf[GlutenPartition],
Expand All @@ -165,7 +166,8 @@ class IteratorApiImpl extends IteratorApi with Logging {
transKernel.createKernelWithBatchIterator(
inputPartition.plan,
splitInfoByteArray,
columnarNativeIterators)
columnarNativeIterators,
partitionIndex)
pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild)

Iterators
Expand Down Expand Up @@ -193,6 +195,7 @@ class IteratorApiImpl extends IteratorApi with Logging {
rootNode: PlanNode,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean): Iterator[ColumnarBatch] = {

ExecutorManager.tryTaskSet(numaBindingInfo)
Expand All @@ -207,7 +210,8 @@ class IteratorApiImpl extends IteratorApi with Logging {
rootNode.toProtobuf.toByteArray,
// Final iterator does not contain scan split, so pass empty split info to native here.
new Array[Array[Byte]](0),
columnarNativeIterator
columnarNativeIterator,
partitionIndex
)

Iterators
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/benchmarks/PlanValidatorUtil.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "memory/VeloxMemoryManager.h"
#include "substrait/SubstraitToVeloxPlanValidator.h"

using namespace facebook::velox;
using namespace gluten;

/// Set spark.gluten.sql.debug=true to get validation plan and dump it into a json file,
Expand All @@ -43,7 +44,8 @@ int main(int argc, char** argv) {
std::unordered_map<std::string, std::string> conf;
conf.insert({kDebugModeEnabled, "true"});
initVeloxBackend(conf);
core::QueryCtx queryCtx;
std::unordered_map<std::string, std::string> configs{{core::QueryConfig::kSparkPartitionId, "0"}};
core::QueryCtx queryCtx(nullptr, core::QueryConfig(configs));
auto pool = defaultLeafVeloxMemoryPool().get();
core::ExecCtx execCtx(pool, &queryCtx);

Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
// Disable driver cpu time slicing.
configs[velox::core::QueryConfig::kDriverCpuTimeSliceLimitMs] = "0";

configs[velox::core::QueryConfig::kSparkPartitionId] = std::to_string(taskInfo_.partitionId);

} catch (const std::invalid_argument& err) {
std::string errDetails = err.what();
throw std::runtime_error("Invalid conf arg: " + errDetails);
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ Java_io_glutenproject_vectorized_PlanEvaluatorJniWrapper_nativeValidateWithFailu
gluten::parseProtobuf(planData, planSize, &subPlan);

// A query context used for function validation.
velox::core::QueryCtx queryCtx;
std::unordered_map<std::string, std::string> configs{{velox::core::QueryConfig::kSparkPartitionId, "0"}};
velox::core::QueryCtx queryCtx(nullptr, velox::core::QueryConfig(configs));
auto pool = gluten::defaultLeafVeloxMemoryPool().get();
// An execution context used for function validation.
velox::core::ExecCtx execCtx(pool, &queryCtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ trait IteratorApi {
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
): Iterator[ColumnarBatch]

Expand All @@ -76,6 +77,7 @@ trait IteratorApi {
rootNode: PlanNode,
pipelineTime: SQLMetric,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
materializeInput: Boolean = false): Iterator[ColumnarBatch]
// scalastyle:on argcount

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class GlutenWholeStageColumnarRDD(
pipelineTime,
updateInputMetrics,
updateNativeMetrics,
split.index,
inputIterators
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class WholeStageZippedPartitionsRDD(
resCtx.root,
pipelineTime,
updateNativeMetrics,
split.index,
materializeInput
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void injectWriteFilesTempPath(String path) {
// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public GeneralOutIterator createKernelWithBatchIterator(
byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList)
byte[] wsPlan, byte[][] splitInfo, List<GeneralInIterator> iterList, int partitionIndex)
throws RuntimeException, IOException {
final AtomicReference<ColumnarBatchOutIterator> outIterator = new AtomicReference<>();
final NativeMemoryManager nmm =
Expand Down Expand Up @@ -101,7 +101,7 @@ public Set<Phase> applicablePhases() {
splitInfo,
iterList.toArray(new GeneralInIterator[0]),
TaskContext.get().stageId(),
TaskContext.getPartitionId(),
partitionIndex, // TaskContext.getPartitionId(),
TaskContext.get().taskAttemptId(),
DebugUtil.saveInputToFile(),
BackendsApiManager.getSparkPlanExecApiInstance().rewriteSpillPath(spillDirPath));
Expand Down

0 comments on commit e78ee43

Please sign in to comment.