Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
def hashProbeDynamicFilterPushdownEnabled: Boolean =
getConf(HASH_PROBE_DYNAMIC_FILTER_PUSHDOWN_ENABLED)

def parallelExecutionEnabled: Boolean =
getConf(PARALLEL_EXECUTION_ENABLED)

def parallelExecutionThreadPoolSize: Option[Int] =
getConf(PARALLEL_EXECUTION_THREAD_POOL_SIZE)

def parallelExecutionMaxDrivers: Int =
getConf(PARALLEL_EXECUTION_MAX_DRIVERS)

def valueStreamDynamicFilterEnabled: Boolean =
getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED)
}
Expand Down Expand Up @@ -470,6 +479,32 @@ object VeloxConfig extends ConfigRegistry {
.booleanConf
.createWithDefault(true)

val PARALLEL_EXECUTION_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.enabled")
.doc(
"Whether to enable parallel execution of Velox task drivers for whole-stage execution. " +
"Default is false (serial execution).")
.booleanConf
.createWithDefault(false)

val PARALLEL_EXECUTION_THREAD_POOL_SIZE =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.threadPoolSize")
.doc(
"Size of the thread pool used for parallel execution of Velox task drivers. " +
"If not set, defaults to 2 * spark.gluten.numTaskSlotsPerExecutor.")
.intConf
.checkValue(_ > 0, "must be a positive number")
.createOptional

val PARALLEL_EXECUTION_MAX_DRIVERS =
buildConf("spark.gluten.sql.columnar.backend.velox.parallelExecution.maxDrivers")
.doc(
"Maximum number of parallel Velox task drivers to use for whole-stage execution. " +
"Default is 4.")
.intConf
.checkValue(_ > 0, "must be a positive number")
.createWithDefault(4)

val VALUE_STREAM_DYNAMIC_FILTER_ENABLED =
buildConf("spark.gluten.sql.columnar.backend.velox.valueStream.dynamicFilter.enabled")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object MetricsUtil extends Logging {
metrics.getSingleMetrics,
joinParamsMap.get(operatorIdx))
case u: UnionMetricsUpdater =>
// JoinRel outputs two suites of metrics respectively for hash build and hash probe.
// Union outputs two suites of metrics respectively.
// Therefore, fetch one more suite of metrics here.
operatorMetrics.add(metrics.getOperatorMetrics(curMetricsIdx))
curMetricsIdx -= 1
Expand Down Expand Up @@ -364,7 +364,7 @@ object MetricsUtil extends Logging {
}
} catch {
case e: Exception =>
logWarning(s"Updating native metrics failed due to ${e.getCause}.")
logWarning(s"Updating native metrics failed: ${e.getMessage}", e)
()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class VeloxTPCHV2BhjSuite extends VeloxTPCHSuite {
}
}

class VeloxPartitionedTableTPCHSuite extends VeloxTPCHSuite {
class VeloxTPCHPartitionedTableSuite extends VeloxTPCHSuite {
override def subType(): String = "partitioned"

override protected def sparkConf: SparkConf = {
Expand Down Expand Up @@ -427,3 +427,26 @@ class VeloxTPCHV1VanillaBhjGlutenBeSuite extends VeloxTPCHSuite {
.set(GlutenConfig.COLUMNAR_BROADCAST_EXCHANGE_ENABLED.key, "true")
}
}

class VeloxTPCHV1ParallelExecutionSuite extends VeloxTPCHSuite {
override def subType(): String = "v1-parallel"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.sources.useV1SourceList", "parquet")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set(VeloxConfig.PARALLEL_EXECUTION_ENABLED.key, "true")
}
}

class VeloxTPCHV1ParallelExecutionBhjSuite extends VeloxTPCHSuite {
override def subType(): String = "v1-parallel-bhj"

override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.sql.sources.useV1SourceList", "parquet")
.set("spark.sql.autoBroadcastJoinThreshold", "-1")
.set(VeloxConfig.PARALLEL_EXECUTION_ENABLED.key, "true")
.set("spark.sql.autoBroadcastJoinThreshold", "30M")
}
}
2 changes: 2 additions & 0 deletions cpp/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ set(SPARK_COLUMNAR_PLUGIN_SRCS
memory/MemoryManager.cc
memory/ArrowMemoryPool.cc
memory/ColumnarBatch.cc
threads/ThreadInitializer.cc
threads/ThreadManager.cc
shuffle/Dictionary.cc
shuffle/FallbackRangePartitioner.cc
shuffle/HashPartitioner.cc
Expand Down
3 changes: 2 additions & 1 deletion cpp/core/compute/Runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ void Runtime::registerFactory(const std::string& kind, Runtime::Factory factory,
Runtime* Runtime::create(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf) {
auto& factory = runtimeFactories().get(kind);
return factory(kind, std::move(memoryManager), sessionConf);
return factory(kind, std::move(memoryManager), std::move(threadManager), sessionConf);
}

void Runtime::release(Runtime* runtime) {
Expand Down
11 changes: 10 additions & 1 deletion cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#include "substrait/plan.pb.h"
#include "threads/ThreadManager.h"
#include "utils/ObjectStore.h"
#include "utils/WholeStageDumper.h"

Expand Down Expand Up @@ -61,12 +62,14 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
using Factory = std::function<Runtime*(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf)>;
using Releaser = std::function<void(Runtime*)>;
static void registerFactory(const std::string& kind, Factory factory, Releaser releaser);
static Runtime* create(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& sessionConf = {});
static void release(Runtime*);
static std::optional<std::string>* localWriteFilesTempPath();
Expand All @@ -75,8 +78,9 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
Runtime(
const std::string& kind,
MemoryManager* memoryManager,
ThreadManager* threadManager,
const std::unordered_map<std::string, std::string>& confMap)
: kind_(kind), memoryManager_(memoryManager), confMap_(confMap) {}
: kind_(kind), memoryManager_(memoryManager), threadManager_(threadManager), confMap_(confMap) {}

virtual ~Runtime() = default;

Expand Down Expand Up @@ -126,6 +130,10 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
return memoryManager_;
};

virtual ThreadManager* threadManager() {
return threadManager_;
};

/// This function is used to create certain converter from the format used by
/// the backend to Spark unsafe row.
virtual std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t column2RowMemThreshold) {
Expand Down Expand Up @@ -184,6 +192,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {
protected:
std::string kind_;
MemoryManager* memoryManager_;
ThreadManager* threadManager_;
std::unique_ptr<ObjectStore> objStore_ = ObjectStore::create();
std::unordered_map<std::string, std::string> confMap_; // Session conf map

Expand Down
3 changes: 1 addition & 2 deletions cpp/core/jni/JniCommon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

#include "JniCommon.h"
#include <folly/system/ThreadName.h>

void gluten::JniCommonState::ensureInitialized(JNIEnv* env) {
std::lock_guard<std::mutex> lockGuard(mtx_);
Expand Down Expand Up @@ -95,7 +96,6 @@ gluten::JniColumnarBatchIterator::~JniColumnarBatchIterator() {
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
env->DeleteGlobalRef(jColumnarBatchItr_);
env->DeleteGlobalRef(serializedColumnarBatchIteratorClass_);
vm_->DetachCurrentThread();
}

std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next() {
Expand All @@ -116,7 +116,6 @@ std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::next()
std::shared_ptr<gluten::ColumnarBatch> gluten::JniColumnarBatchIterator::nextInternal() const {
JNIEnv* env = nullptr;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);

if (!env->CallBooleanMethod(jColumnarBatchItr_, serializedColumnarBatchIteratorHasNext_)) {
checkException(env);
return nullptr; // stream ended
Expand Down
73 changes: 72 additions & 1 deletion cpp/core/jni/JniCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "compute/Runtime.h"
#include "memory/AllocationListener.h"
#include "shuffle/rss/RssClient.h"
#include "threads/ThreadInitializer.h"
#include "utils/Compression.h"
#include "utils/Exception.h"
#include "utils/ResourceMap.h"
Expand Down Expand Up @@ -463,7 +464,7 @@ class SparkAllocationListener final : public gluten::AllocationListener {
};

class BacktraceAllocationListener final : public gluten::AllocationListener {
public:
public:
BacktraceAllocationListener(std::unique_ptr<gluten::AllocationListener> delegator)
: delegator_(std::move(delegator)) {}

Expand Down Expand Up @@ -549,3 +550,73 @@ class JavaRssClient : public RssClient {
jmethodID javaPushPartitionData_;
jbyteArray array_;
};

class SparkThreadInitializer final : public gluten::ThreadInitializer {
public:
SparkThreadInitializer(JavaVM* vm, jobject jInitializerLocalRef) : vm_(vm) {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jInitializerGlobalRef_ = env->NewGlobalRef(jInitializerLocalRef);
GLUTEN_CHECK(jInitializerGlobalRef_ != nullptr, "Failed to create global reference for native thread initializer.");
(void)initializeMethod(env);
}

SparkThreadInitializer(const SparkThreadInitializer&) = delete;
SparkThreadInitializer(SparkThreadInitializer&&) = delete;
SparkThreadInitializer& operator=(const SparkThreadInitializer&) = delete;
SparkThreadInitializer& operator=(SparkThreadInitializer&&) = delete;

~SparkThreadInitializer() override {
JNIEnv* env;
if (vm_->GetEnv(reinterpret_cast<void**>(&env), jniVersion) != JNI_OK) {
LOG(WARNING) << "SparkThreadInitializer#~SparkThreadInitializer(): "
<< "JNIEnv was not attached to current thread";
return;
}
env->DeleteGlobalRef(jInitializerGlobalRef_);
}

void initialize(const std::string& threadName) override {
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jstring jThreadName = env->NewStringUTF(threadName.c_str());
env->CallVoidMethod(jInitializerGlobalRef_, initializeMethod(env), jThreadName);
env->DeleteLocalRef(jThreadName);
checkException(env);
}

void destroy(const std::string& threadName) override {
// IMPORTANT: Do not call vm_.DetachCurrentThread here, otherwise Java side thread
// object might be dereferenced and garbage-collected, to break the reuse of thread
// resources.
JNIEnv* env;
attachCurrentThreadAsDaemonOrThrow(vm_, &env);
jstring jThreadName = env->NewStringUTF(threadName.c_str());
env->CallVoidMethod(jInitializerGlobalRef_, destroyMethod(env), jThreadName);
env->DeleteLocalRef(jThreadName);
checkException(env);
}

private:
jmethodID initializeMethod(JNIEnv* env) {
static jmethodID initializeMethod =
getMethodIdOrError(env, nativeThreadInitializerClass(env), "initialize", "(Ljava/lang/String;)V");
return initializeMethod;
}

jmethodID destroyMethod(JNIEnv* env) {
static jmethodID destroyMethod =
getMethodIdOrError(env, nativeThreadInitializerClass(env), "destroy", "(Ljava/lang/String;)V");
return destroyMethod;
}

jclass nativeThreadInitializerClass(JNIEnv* env) {
static jclass javaInitializerClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/threads/NativeThreadInitializer;");
return javaInitializerClass;
}

private:
JavaVM* vm_;
jobject jInitializerGlobalRef_;
};
Loading
Loading