diff --git a/.github/workflows/velox_docker.yml b/.github/workflows/velox_docker.yml index 571dc6f906f8..8a58d1037f7f 100644 --- a/.github/workflows/velox_docker.yml +++ b/.github/workflows/velox_docker.yml @@ -21,6 +21,7 @@ on: - '.github/workflows/velox_docker.yml' - 'pom.xml' - 'backends-velox/**' + - 'gluten-uniffle/**' - 'gluten-celeborn/common/**' - 'gluten-celeborn/package/**' - 'gluten-celeborn/velox/**' @@ -326,6 +327,66 @@ jobs: --local --preset=velox --benchmark-type=ds --error-on-memleak -s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 \ --skip-data-gen --random-kill-tasks + run-tpc-test-centos8-uniffle: + needs: build-native-lib + strategy: + fail-fast: false + matrix: + spark: ["spark-3.2"] + runs-on: ubuntu-20.04 + container: centos:8 + steps: + - uses: actions/checkout@v2 + - name: Download All Artifacts + uses: actions/download-artifact@v2 + with: + name: velox-native-lib-${{github.sha}} + path: ./cpp/build/releases + - name: Update mirror list + run: | + sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true + sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true + - name: Setup java and maven + run: | + yum update -y && yum install -y java-1.8.0-openjdk-devel wget git + wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz + tar -xvf apache-maven-3.8.8-bin.tar.gz + mv apache-maven-3.8.8 /usr/lib/maven + - name: Build for Spark ${{ matrix.spark }} + run: | + cd $GITHUB_WORKSPACE/ && \ + export MAVEN_HOME=/usr/lib/maven && \ + export PATH=${PATH}:${MAVEN_HOME}/bin && \ + mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Prss-uniffle -DskipTests + - name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0 + run: | + export MAVEN_HOME=/usr/lib/maven && \ + export PATH=${PATH}:${MAVEN_HOME}/bin && \ + export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \ + cd /opt && \ + git clone -b branch-0.8 https://github.com/apache/incubator-uniffle.git && \ + cd incubator-uniffle && \ + sed -i '250d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ + sed -i '228d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ + sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ + mvn clean install -Phadoop2.8 -DskipTests + cd /opt && \ + wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \ + tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \ + wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \ + tar xzf hadoop-2.8.5.tar.gz -C /opt/ + rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar + cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/ + rm -rf /opt/incubator-uniffle + cd /opt/uniffle && mkdir shuffle_data && \ + bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > ./bin/rss-env.sh" && \ + bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \ + bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \ + bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh + cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss-uniffle && \ + GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ + --local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 + run-tpc-test-ubuntu-2204-celeborn: needs: build-native-lib strategy: diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 57fd81ba03b4..791a635b03fb 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -473,6 +473,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { val constructor = clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], classOf[SQLMetric]) constructor.newInstance(readBatchNumRows, numOutputRows, dataSize).asInstanceOf[Serializer] + } else if (GlutenConfig.getConf.isUseUniffleShuffleManager) { + throw new UnsupportedOperationException("temporarily uniffle not support ch ") } else { new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize) } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index fc493555affa..d969be06a474 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -349,7 +349,8 @@ object BackendSettings extends BackendSettingsApi { override def supportColumnarShuffleExec(): Boolean = { GlutenConfig.getConf.isUseColumnarShuffleManager || - GlutenConfig.getConf.isUseCelebornShuffleManager + GlutenConfig.getConf.isUseCelebornShuffleManager || + GlutenConfig.getConf.isUseUniffleShuffleManager } override def enableJoinKeysRewrite(): Boolean = false diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 51e1da6529ac..6f5b2e332e37 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -915,6 +915,23 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe std::move(partitionWriterOptions), memoryManager->getArrowMemoryPool(), std::move(celebornClient)); + } else if (partitionWriterType == "uniffle") { + jclass unifflePartitionPusherClass = + createGlobalClassReferenceOrError(env, "Lorg/apache/spark/shuffle/writer/PartitionPusher;"); + jmethodID unifflePushPartitionDataMethod = + getMethodIdOrError(env, unifflePartitionPusherClass, "pushPartitionData", "(I[BI)I"); + JavaVM* vm; + if (env->GetJavaVM(&vm) != JNI_OK) { + throw gluten::GlutenException("Unable to get JavaVM instance"); + } + // rename CelebornClient RssClient + std::shared_ptr uniffleClient = + std::make_shared(vm, partitionPusher, unifflePushPartitionDataMethod); + partitionWriter = std::make_unique( + numPartitions, + std::move(partitionWriterOptions), + memoryManager->getArrowMemoryPool(), + std::move(uniffleClient)); } else { throw gluten::GlutenException("Unrecognizable partition writer type: " + partitionWriterType); } diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h index 6531f150ede1..6a793daa3488 100644 --- a/cpp/core/shuffle/Options.h +++ b/cpp/core/shuffle/Options.h @@ -33,7 +33,7 @@ static constexpr double kDefaultBufferReallocThreshold = 0.25; static constexpr double kDefaultMergeBufferThreshold = 0.25; static constexpr bool kEnableBufferedWrite = true; -enum PartitionWriterType { kLocal, kCeleborn }; +enum PartitionWriterType { kLocal, kCeleborn, kUniffle }; struct ShuffleReaderOptions { arrow::Compression::type compressionType = arrow::Compression::type::LZ4_FRAME; diff --git a/gluten-uniffle/package/pom.xml b/gluten-uniffle/package/pom.xml new file mode 100644 index 000000000000..64c0fadd3ad6 --- /dev/null +++ b/gluten-uniffle/package/pom.xml @@ -0,0 +1,29 @@ + + + + gluten-uniffle + org.apache.gluten + 1.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-uniffle-package + jar + Gluten Uniffle Package + + + + backends-velox + + + org.apache.gluten + gluten-uniffle-velox + ${project.version} + + + + + diff --git a/gluten-uniffle/pom.xml b/gluten-uniffle/pom.xml new file mode 100644 index 000000000000..ce38bdd2915b --- /dev/null +++ b/gluten-uniffle/pom.xml @@ -0,0 +1,89 @@ + + + + gluten-parent + org.apache.gluten + 1.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-uniffle + pom + Gluten Uniffle + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + net.minidev + json-smart + + + com.sun.jersey + jersey-json + + + + + org.apache.gluten + gluten-core + ${project.version} + provided + + + org.apache.uniffle + rss-client-spark${spark.major.version}-shaded + ${uniffle.version} + provided + + + org.apache.spark + spark-core_${scala.binary.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + provided + + + + + + + net.alchim31.maven + scala-maven-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + + + + backends-velox + + + + velox + package + + + + diff --git a/gluten-uniffle/velox/pom.xml b/gluten-uniffle/velox/pom.xml new file mode 100755 index 000000000000..19865fa6999d --- /dev/null +++ b/gluten-uniffle/velox/pom.xml @@ -0,0 +1,62 @@ + + + + gluten-uniffle + org.apache.gluten + 1.2.0-SNAPSHOT + ../pom.xml + + 4.0.0 + + gluten-uniffle-velox + jar + Gluten Uniffle Velox + + + + org.apache.gluten + backends-velox + ${project.version} + provided + + + org.apache.gluten + gluten-data + ${project.version} + provided + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + net.alchim31.maven + scala-maven-plugin + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + com.diffplug.spotless + spotless-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + + diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java new file mode 100644 index 000000000000..9ae62f8b8a23 --- /dev/null +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/gluten/uniffle/UniffleShuffleManager.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.gluten.uniffle; + +import org.apache.spark.ShuffleDependency; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; +import org.apache.spark.TaskContext; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.shuffle.ColumnarShuffleDependency; +import org.apache.spark.shuffle.RssShuffleHandle; +import org.apache.spark.shuffle.RssShuffleManager; +import org.apache.spark.shuffle.RssSparkConfig; +import org.apache.spark.shuffle.ShuffleHandle; +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; +import org.apache.spark.shuffle.ShuffleWriter; +import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter; +import org.apache.uniffle.common.exception.RssException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UniffleShuffleManager extends RssShuffleManager { + private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class); + + private boolean isDriver() { + return "driver".equals(SparkEnv.get().executorId()); + } + + public UniffleShuffleManager(SparkConf conf, boolean isDriver) { + super(conf, isDriver); + conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED, "false"); + } + + @Override + public ShuffleHandle registerShuffle( + int shuffleId, ShuffleDependency dependency) { + return super.registerShuffle(shuffleId, dependency); + } + + @Override + public ShuffleWriter getWriter( + ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) { + if (!(handle instanceof RssShuffleHandle)) { + throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName()); + } + RssShuffleHandle rssHandle = (RssShuffleHandle) handle; + if (rssHandle.getDependency() instanceof ColumnarShuffleDependency) { + setPusherAppId(rssHandle); + String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber(); + ShuffleWriteMetrics writeMetrics; + if (metrics != null) { + writeMetrics = new WriteMetrics(metrics); + } else { + writeMetrics = context.taskMetrics().shuffleWriteMetrics(); + } + return new VeloxUniffleColumnarShuffleWriter<>( + context.partitionId(), + rssHandle.getAppId(), + rssHandle.getShuffleId(), + taskId, + context.taskAttemptId(), + writeMetrics, + this, + sparkConf, + shuffleWriteClient, + rssHandle, + this::markFailedTask, + context); + } else { + return super.getWriter(handle, mapId, context, metrics); + } + } +} diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java new file mode 100644 index 000000000000..17cfce1c0c3e --- /dev/null +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.writer; + +import org.apache.gluten.GlutenConfig; +import org.apache.gluten.columnarbatch.ColumnarBatches; +import org.apache.gluten.memory.memtarget.MemoryTarget; +import org.apache.gluten.memory.memtarget.Spiller; +import org.apache.gluten.memory.memtarget.Spillers; +import org.apache.gluten.memory.nmm.NativeMemoryManagers; +import org.apache.gluten.vectorized.ShuffleWriterJniWrapper; +import org.apache.gluten.vectorized.SplitResult; + +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.internal.config.package$; +import org.apache.spark.memory.SparkMemoryUtil; +import org.apache.spark.scheduler.MapStatus; +import org.apache.spark.shuffle.ColumnarShuffleDependency; +import org.apache.spark.shuffle.GlutenShuffleUtils; +import org.apache.spark.shuffle.RssShuffleHandle; +import org.apache.spark.shuffle.RssShuffleManager; +import org.apache.spark.shuffle.RssSparkConfig; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.util.SparkResourceUtil; +import org.apache.uniffle.client.api.ShuffleWriteClient; +import org.apache.uniffle.common.ShuffleBlockInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import scala.Option; +import scala.Product2; +import scala.collection.Iterator; + +public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter { + + private static final Logger LOG = + LoggerFactory.getLogger(VeloxUniffleColumnarShuffleWriter.class); + + private long nativeShuffleWriter = -1L; + + private boolean stopping = false; + private int compressThreshold = GlutenConfig.getConf().columnarShuffleCompressionThreshold(); + private double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold(); + private String compressionCodec; + private int compressionLevel; + private int partitionId; + + private ShuffleWriterJniWrapper jniWrapper = ShuffleWriterJniWrapper.create(); + private SplitResult splitResult; + private int nativeBufferSize = GlutenConfig.getConf().maxBatchSize(); + private int bufferSize; + private PartitionPusher partitionPusher; + + private final ColumnarShuffleDependency columnarDep; + private final SparkConf sparkConf; + + private long availableOffHeapPerTask() { + return SparkMemoryUtil.getCurrentAvailableOffHeapMemory() + / SparkResourceUtil.getTaskSlots(sparkConf); + } + + public VeloxUniffleColumnarShuffleWriter( + int partitionId, + String appId, + int shuffleId, + String taskId, + long taskAttemptId, + ShuffleWriteMetrics shuffleWriteMetrics, + RssShuffleManager shuffleManager, + SparkConf sparkConf, + ShuffleWriteClient shuffleWriteClient, + RssShuffleHandle rssHandle, + Function taskFailureCallback, + TaskContext context) { + super( + appId, + shuffleId, + taskId, + taskAttemptId, + shuffleWriteMetrics, + shuffleManager, + sparkConf, + shuffleWriteClient, + rssHandle, + taskFailureCallback, + context); + columnarDep = (ColumnarShuffleDependency) rssHandle.getDependency(); + this.partitionId = partitionId; + this.sparkConf = sparkConf; + bufferSize = + (int) + sparkConf.getSizeAsBytes( + RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), + RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get()); + if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) { + compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf); + } + compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf, compressionCodec, null); + } + + @Override + protected void writeImpl(Iterator> records) throws IOException { + if (!records.hasNext() && !isMemoryShuffleEnabled) { + super.sendCommit(); + return; + } + // writer already init + partitionPusher = new PartitionPusher(this); + while (records.hasNext()) { + ColumnarBatch cb = (ColumnarBatch) (records.next()._2()); + if (cb.numRows() == 0 || cb.numCols() == 0) { + LOG.info("Skip ColumnarBatch of 0 rows or 0 cols"); + } else { + long handle = ColumnarBatches.getNativeHandle(cb); + if (nativeShuffleWriter == -1) { + nativeShuffleWriter = + jniWrapper.makeForRSS( + columnarDep.nativePartitioning(), + nativeBufferSize, + // use field do this + compressionCodec, + compressionLevel, + compressThreshold, + GlutenConfig.getConf().columnarShuffleCompressionMode(), + bufferSize, + partitionPusher, + NativeMemoryManagers.create( + "UniffleShuffleWriter", + new Spiller() { + @Override + public long spill(MemoryTarget self, long size) { + if (nativeShuffleWriter == -1) { + throw new IllegalStateException( + "Fatal: spill() called before a shuffle shuffle writer " + + "evaluator is created. This behavior should be" + + "optimized by moving memory " + + "allocations from make() to split()"); + } + LOG.info( + "Gluten shuffle writer: Trying to push {} bytes of data", size); + long pushed = + jniWrapper.nativeEvict(nativeShuffleWriter, size, false); + LOG.info( + "Gluten shuffle writer: Pushed {} / {} bytes of data", + pushed, + size); + return pushed; + } + + @Override + public Set applicablePhases() { + return Spillers.PHASE_SET_SPILL_ONLY; + } + }) + .getNativeInstanceHandle(), + handle, + taskAttemptId, + GlutenShuffleUtils.getStartPartitionId( + columnarDep.nativePartitioning(), partitionId), + "uniffle", + reallocThreshold); + } + long startTime = System.nanoTime(); + long bytes = + jniWrapper.split(nativeShuffleWriter, cb.numRows(), handle, availableOffHeapPerTask()); + LOG.debug("jniWrapper.split rows {}, split bytes {}", cb.numRows(), bytes); + columnarDep.metrics().get("dataSize").get().add(bytes); + // this metric replace part of uniffle shuffle write time + columnarDep.metrics().get("splitTime").get().add(System.nanoTime() - startTime); + columnarDep.metrics().get("numInputRows").get().add(cb.numRows()); + columnarDep.metrics().get("inputBatches").get().add(1); + shuffleWriteMetrics.incRecordsWritten(cb.numRows()); + } + } + + long startTime = System.nanoTime(); + LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter); + if (nativeShuffleWriter == -1L) { + throw new IllegalStateException("nativeShuffleWriter should not be -1L"); + } + splitResult = jniWrapper.stop(nativeShuffleWriter); + columnarDep + .metrics() + .get("splitTime") + .get() + .add( + System.nanoTime() + - startTime + - splitResult.getTotalPushTime() + - splitResult.getTotalWriteTime() + - splitResult.getTotalCompressTime()); + + shuffleWriteMetrics.incBytesWritten(splitResult.getTotalBytesWritten()); + shuffleWriteMetrics.incWriteTime( + splitResult.getTotalWriteTime() + splitResult.getTotalPushTime()); + // partitionLengths is calculate in uniffle side + + long pushMergedDataTime = System.nanoTime(); + // clear all + sendRestBlockAndWait(); + if (!isMemoryShuffleEnabled) { + super.sendCommit(); + } + long writeDurationMs = System.nanoTime() - pushMergedDataTime; + shuffleWriteMetrics.incWriteTime(writeDurationMs); + LOG.info( + "Finish write shuffle with rest write {} ms", + TimeUnit.MILLISECONDS.toNanos(writeDurationMs)); + } + + @Override + public Option stop(boolean success) { + if (!stopping) { + stopping = true; + closeShuffleWriter(); + return super.stop(success); + } + return Option.empty(); + } + + private void closeShuffleWriter() { + if (nativeShuffleWriter != -1) { + jniWrapper.close(nativeShuffleWriter); + nativeShuffleWriter = -1; + } + } + + private void sendRestBlockAndWait() { + List shuffleBlockInfos = super.getBufferManager().clear(); + super.processShuffleBlockInfos(shuffleBlockInfos); + // make checkBlockSendResult no arguments + super.internalCheckBlockSendResult(); + } + + public int doAddByte(int partitionId, byte[] data, int length) { + List shuffleBlockInfos = + super.getBufferManager() + .addPartitionData(partitionId, data, length, System.currentTimeMillis()); + super.processShuffleBlockInfos(shuffleBlockInfos); + return length; + } +} diff --git a/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala b/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala new file mode 100644 index 000000000000..eb99fd23bb8e --- /dev/null +++ b/gluten-uniffle/velox/src/main/scala/org/apache/spark/shuffle/writer/PartitionPusher.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.shuffle.writer + +import java.io.IOException + +class PartitionPusher(val uniffleWriter: VeloxUniffleColumnarShuffleWriter[_, _]) { + + @throws[IOException] + def pushPartitionData(partitionId: Int, buffer: Array[Byte], length: Int): Int = { + uniffleWriter.doAddByte(partitionId, buffer, length) + } +} diff --git a/package/pom.xml b/package/pom.xml index 678f13b6783d..913621ee1bb0 100644 --- a/package/pom.xml +++ b/package/pom.xml @@ -80,6 +80,16 @@ + + rss-uniffle + + + org.apache.gluten + gluten-uniffle-package + ${project.version} + + + diff --git a/pom.xml b/pom.xml index b5833b3adab0..b65f17314864 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ spark-sql-columnar-shims-spark32 0.3.2-incubating 14.0.1 + 0.8.0 arrow-memory-unsafe 2.7.4 UTF-8 @@ -237,6 +238,15 @@ gluten-celeborn + + rss-uniffle + + false + + + gluten-uniffle + + delta diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index d2b0fd78fca0..52a2938a4407 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -120,6 +120,11 @@ class GlutenConfig(conf: SQLConf) extends Logging { .getConfString("spark.shuffle.manager", "sort") .contains("celeborn") + def isUseUniffleShuffleManager: Boolean = + conf + .getConfString("spark.shuffle.manager", "sort") + .contains("RssShuffleManager") + def enableColumnarShuffle: Boolean = conf.getConf(COLUMNAR_SHUFFLE_ENABLED) def enablePreferColumnar: Boolean = conf.getConf(COLUMNAR_PREFER_ENABLED) diff --git a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java index 15a8e66cadad..c0313fe77b10 100644 --- a/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java +++ b/tools/gluten-it/common/src/main/java/org/apache/gluten/integration/tpc/TpcMixin.java @@ -35,10 +35,10 @@ public class TpcMixin { @CommandLine.Option(required = true, names = {"--benchmark-type"}, description = "TPC benchmark type: h, ds", defaultValue = "h") private String benchmarkType; - @CommandLine.Option(names = {"-p", "--preset"}, description = "Preset used: vanilla, velox, velox-with-celeborn...", defaultValue = "velox") + @CommandLine.Option(names = {"-p", "--preset"}, description = "Preset used: vanilla, velox, velox-with-celeborn, velox-with-uniffle...", defaultValue = "velox") private String preset; - @CommandLine.Option(names = {"--baseline-preset"}, description = "Baseline preset used: vanilla, velox, velox-with-celeborn...", defaultValue = "vanilla") + @CommandLine.Option(names = {"--baseline-preset"}, description = "Baseline preset used: vanilla, velox, velox-with-celeborn, velox-with-uniffle...", defaultValue = "vanilla") private String baselinePreset; @CommandLine.Option(names = {"--log-level"}, description = "Set log level: 0 for DEBUG, 1 for INFO, 2 for WARN", defaultValue = "2") @@ -92,6 +92,9 @@ private SparkConf pickSparkConf(String preset) { case "velox-with-celeborn": conf = Constants.VELOX_WITH_CELEBORN_CONF(); break; + case "velox-with-uniffle": + conf = Constants.VELOX_WITH_UNIFFLE_CONF(); + break; default: throw new IllegalArgumentException("Preset not found: " + preset); } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala index 44a02593280a..7564f6dce90a 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/Constants.scala @@ -58,6 +58,25 @@ object Constants { .set("spark.celeborn.push.data.timeout", "600s") .set("spark.celeborn.push.limit.inFlight.timeout", "1200s") + val VELOX_WITH_UNIFFLE_CONF: SparkConf = new SparkConf(false) + .set("spark.gluten.sql.columnar.forceShuffledHashJoin", "true") + .set("spark.sql.parquet.enableVectorizedReader", "true") + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.gluten.uniffle.UniffleShuffleManager") + .set("spark.rss.coordinator.quorum", "localhost:19999") + .set("spark.rss.storage.type", "MEMORY_LOCALFILE") + .set("spark.rss.client.type", "GRPC_NETTY") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.shuffle.service.enabled", "false") + .set("spark.sql.adaptive.localShuffleReader.enabled", "false") + .set("spark.dynamicAllocation.enabled", "false") + .set("spark.sql.optimizer.runtime.bloomFilter.enabled", "true") + .set("spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold", "0") + .set( + "spark.gluten.sql.columnar.physicalJoinOptimizeEnable", + "false" + ) + @deprecated val TYPE_MODIFIER_DATE_AS_DOUBLE: TypeModifier = new TypeModifier(TypeUtils.typeAccepts(_, DateType), DoubleType) { diff --git a/tools/gluten-it/package/pom.xml b/tools/gluten-it/package/pom.xml index 6852edcfe8eb..3beef2df7c1d 100644 --- a/tools/gluten-it/package/pom.xml +++ b/tools/gluten-it/package/pom.xml @@ -118,5 +118,19 @@ + + rss-uniffle + + false + + + + org.apache.uniffle + rss-client-spark${spark.major.version}-shaded + ${uniffle.version} + runtime + + + diff --git a/tools/gluten-it/pom.xml b/tools/gluten-it/pom.xml index 128bf4b26295..c0e2fc321643 100644 --- a/tools/gluten-it/pom.xml +++ b/tools/gluten-it/pom.xml @@ -22,6 +22,7 @@ 2.12 3 0.3.0-incubating + 0.8.0 1.2.0-SNAPSHOT 32.0.1-jre 1.1