-
Notifications
You must be signed in to change notification settings - Fork 434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] Add uniffle integration #3767
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,7 @@ on: | |
- '.github/workflows/velox_docker.yml' | ||
- 'pom.xml' | ||
- 'backends-velox/**' | ||
- 'gluten-uniffle/**' | ||
- 'gluten-celeborn/common/**' | ||
- 'gluten-celeborn/package/**' | ||
- 'gluten-celeborn/velox/**' | ||
|
@@ -326,6 +327,66 @@ jobs: | |
--local --preset=velox --benchmark-type=ds --error-on-memleak -s=30.0 --off-heap-size=8g --threads=12 --shuffle-partitions=72 --iterations=1 \ | ||
--skip-data-gen --random-kill-tasks | ||
|
||
run-tpc-test-centos8-uniffle: | ||
needs: build-native-lib | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
spark: ["spark-3.2"] | ||
runs-on: ubuntu-20.04 | ||
container: centos:8 | ||
steps: | ||
- uses: actions/checkout@v2 | ||
- name: Download All Artifacts | ||
uses: actions/download-artifact@v2 | ||
with: | ||
name: velox-native-lib-${{github.sha}} | ||
path: ./cpp/build/releases | ||
- name: Update mirror list | ||
run: | | ||
sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true | ||
sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true | ||
- name: Setup java and maven | ||
run: | | ||
yum update -y && yum install -y java-1.8.0-openjdk-devel wget git | ||
wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz | ||
tar -xvf apache-maven-3.8.8-bin.tar.gz | ||
mv apache-maven-3.8.8 /usr/lib/maven | ||
- name: Build for Spark ${{ matrix.spark }} | ||
run: | | ||
cd $GITHUB_WORKSPACE/ && \ | ||
export MAVEN_HOME=/usr/lib/maven && \ | ||
export PATH=${PATH}:${MAVEN_HOME}/bin && \ | ||
mvn clean install -P${{ matrix.spark }} -Pbackends-velox -Prss-uniffle -DskipTests | ||
- name: TPC-H SF1.0 && TPC-DS SF1.0 Parquet local spark3.2 with uniffle 0.8.0 | ||
run: | | ||
export MAVEN_HOME=/usr/lib/maven && \ | ||
export PATH=${PATH}:${MAVEN_HOME}/bin && \ | ||
export export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk && \ | ||
cd /opt && \ | ||
git clone -b branch-0.8 https://github.com/apache/incubator-uniffle.git && \ | ||
cd incubator-uniffle && \ | ||
sed -i '250d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ | ||
sed -i '228d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ | ||
sed -i '226d' ./server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java && \ | ||
mvn clean install -Phadoop2.8 -DskipTests | ||
cd /opt && \ | ||
wget -nv https://archive.apache.org/dist/incubator/uniffle/0.8.0/apache-uniffle-0.8.0-incubating-bin.tar.gz && \ | ||
tar xzf apache-uniffle-0.8.0-incubating-bin.tar.gz -C /opt/ && mv /opt/rss-0.8.0-hadoop2.8 /opt/uniffle && \ | ||
wget -nv https://archive.apache.org/dist/hadoop/common/hadoop-2.8.5/hadoop-2.8.5.tar.gz && \ | ||
tar xzf hadoop-2.8.5.tar.gz -C /opt/ | ||
rm -f /opt/uniffle/jars/server/shuffle-server-0.8.0-SNAPSHOT.jar | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you want to remove |
||
cp /opt/incubator-uniffle/server/target/shuffle-server-0.8.1-SNAPSHOT.jar /opt/uniffle/jars/server/ | ||
rm -rf /opt/incubator-uniffle | ||
cd /opt/uniffle && mkdir shuffle_data && \ | ||
bash -c "echo -e 'XMX_SIZE=16g\nHADOOP_HOME=/opt/hadoop-2.8.5' > ./bin/rss-env.sh" && \ | ||
bash -c "echo -e 'rss.coordinator.shuffle.nodes.max 1\nrss.rpc.server.port 19999' > ./conf/coordinator.conf" && \ | ||
bash -c "echo -e 'rss.server.app.expired.withoutHeartbeat 7200000\nrss.server.heartbeat.delay 3000\nrss.rpc.server.port 19997\nrss.jetty.http.port 19996\nrss.server.netty.port 19995\nrss.storage.basePath /opt/uniffle/shuffle_data\nrss.storage.type MEMORY_LOCALFILE\nrss.coordinator.quorum localhost:19999\nrss.server.flush.thread.alive 10\nrss.server.single.buffer.flush.threshold 64m' > ./conf/server.conf" && \ | ||
bash ./bin/start-coordinator.sh && bash ./bin/start-shuffle-server.sh | ||
cd $GITHUB_WORKSPACE/tools/gluten-it && mvn clean install -Pspark-3.2,rss-uniffle && \ | ||
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \ | ||
--local --preset=velox-with-uniffle --benchmark-type=h --error-on-memleak --off-heap-size=10g -s=1.0 --threads=16 --iterations=1 | ||
|
||
run-tpc-test-ubuntu-2204-celeborn: | ||
needs: build-native-lib | ||
strategy: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -915,6 +915,23 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe | |
std::move(partitionWriterOptions), | ||
memoryManager->getArrowMemoryPool(), | ||
std::move(celebornClient)); | ||
} else if (partitionWriterType == "uniffle") { | ||
jclass unifflePartitionPusherClass = | ||
createGlobalClassReferenceOrError(env, "Lorg/apache/spark/shuffle/writer/PartitionPusher;"); | ||
jmethodID unifflePushPartitionDataMethod = | ||
getMethodIdOrError(env, unifflePartitionPusherClass, "pushPartitionData", "(I[BI)I"); | ||
JavaVM* vm; | ||
if (env->GetJavaVM(&vm) != JNI_OK) { | ||
throw gluten::GlutenException("Unable to get JavaVM instance"); | ||
} | ||
// rename CelebornClient RssClient | ||
std::shared_ptr<CelebornClient> uniffleClient = | ||
std::make_shared<CelebornClient>(vm, partitionPusher, unifflePushPartitionDataMethod); | ||
summaryzb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
partitionWriter = std::make_unique<CelebornPartitionWriter>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like the above comment, better to use a common name if they indeed can be shared by both celeborn & uniffle. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a common name relate to all the logic of rss in jni layer, but not limit to this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @summaryzb, ok to me. Please create an issue to track this. Thanks! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Follow this |
||
numPartitions, | ||
std::move(partitionWriterOptions), | ||
memoryManager->getArrowMemoryPool(), | ||
std::move(uniffleClient)); | ||
} else { | ||
throw gluten::GlutenException("Unrecognizable partition writer type: " + partitionWriterType); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>gluten-uniffle</artifactId> | ||
<groupId>org.apache.gluten</groupId> | ||
<version>1.2.0-SNAPSHOT</version> | ||
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>gluten-uniffle-package</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Gluten Uniffle Package</name> | ||
|
||
<profiles> | ||
<profile> | ||
<id>backends-velox</id> | ||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.gluten</groupId> | ||
<artifactId>gluten-uniffle-velox</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
</dependencies> | ||
</profile> | ||
</profiles> | ||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>gluten-parent</artifactId> | ||
<groupId>org.apache.gluten</groupId> | ||
<version>1.2.0-SNAPSHOT</version> | ||
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>gluten-uniffle</artifactId> | ||
<packaging>pom</packaging> | ||
<name>Gluten Uniffle</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-common</artifactId> | ||
<version>${hadoop.version}</version> | ||
<scope>provided</scope> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>net.minidev</groupId> | ||
<artifactId>json-smart</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>com.sun.jersey</groupId> | ||
<artifactId>jersey-json</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.gluten</groupId> | ||
<artifactId>gluten-core</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.uniffle</groupId> | ||
<artifactId>rss-client-spark${spark.major.version}-shaded</artifactId> | ||
<version>${uniffle.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-sql_${scala.binary.version}</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>net.alchim31.maven</groupId> | ||
<artifactId>scala-maven-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.scalastyle</groupId> | ||
<artifactId>scalastyle-maven-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-checkstyle-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>com.diffplug.spotless</groupId> | ||
<artifactId>spotless-maven-plugin</artifactId> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
<profiles> | ||
<profile> | ||
<id>backends-velox</id> | ||
<properties> | ||
</properties> | ||
<modules> | ||
<module>velox</module> | ||
<module>package</module> | ||
</modules> | ||
</profile> | ||
</profiles> | ||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns="http://maven.apache.org/POM/4.0.0" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>gluten-uniffle</artifactId> | ||
<groupId>org.apache.gluten</groupId> | ||
<version>1.2.0-SNAPSHOT</version> | ||
<relativePath>../pom.xml</relativePath> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>gluten-uniffle-velox</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Gluten Uniffle Velox</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.gluten</groupId> | ||
<artifactId>backends-velox</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.gluten</groupId> | ||
<artifactId>gluten-data</artifactId> | ||
<version>${project.version}</version> | ||
<scope>provided</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
<plugins> | ||
<plugin> | ||
<groupId>net.alchim31.maven</groupId> | ||
<artifactId>scala-maven-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.scalastyle</groupId> | ||
<artifactId>scalastyle-maven-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-checkstyle-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>com.diffplug.spotless</groupId> | ||
<artifactId>spotless-maven-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-jar-plugin</artifactId> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.shuffle.gluten.uniffle; | ||
|
||
import org.apache.spark.ShuffleDependency; | ||
import org.apache.spark.SparkConf; | ||
import org.apache.spark.SparkEnv; | ||
import org.apache.spark.TaskContext; | ||
import org.apache.spark.executor.ShuffleWriteMetrics; | ||
import org.apache.spark.shuffle.ColumnarShuffleDependency; | ||
import org.apache.spark.shuffle.RssShuffleHandle; | ||
import org.apache.spark.shuffle.RssShuffleManager; | ||
import org.apache.spark.shuffle.RssSparkConfig; | ||
import org.apache.spark.shuffle.ShuffleHandle; | ||
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter; | ||
import org.apache.spark.shuffle.ShuffleWriter; | ||
import org.apache.spark.shuffle.writer.VeloxUniffleColumnarShuffleWriter; | ||
import org.apache.uniffle.common.exception.RssException; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class UniffleShuffleManager extends RssShuffleManager { | ||
private static final Logger LOG = LoggerFactory.getLogger(UniffleShuffleManager.class); | ||
|
||
private boolean isDriver() { | ||
return "driver".equals(SparkEnv.get().executorId()); | ||
} | ||
|
||
public UniffleShuffleManager(SparkConf conf, boolean isDriver) { | ||
super(conf, isDriver); | ||
conf.set(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX + RssSparkConfig.RSS_ROW_BASED, "false"); | ||
} | ||
|
||
@Override | ||
public <K, V, C> ShuffleHandle registerShuffle( | ||
int shuffleId, ShuffleDependency<K, V, C> dependency) { | ||
return super.registerShuffle(shuffleId, dependency); | ||
} | ||
|
||
@Override | ||
public <K, V> ShuffleWriter<K, V> getWriter( | ||
ShuffleHandle handle, long mapId, TaskContext context, ShuffleWriteMetricsReporter metrics) { | ||
if (!(handle instanceof RssShuffleHandle)) { | ||
throw new RssException("Unexpected ShuffleHandle:" + handle.getClass().getName()); | ||
} | ||
RssShuffleHandle<K, V, V> rssHandle = (RssShuffleHandle<K, V, V>) handle; | ||
if (rssHandle.getDependency() instanceof ColumnarShuffleDependency) { | ||
setPusherAppId(rssHandle); | ||
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber(); | ||
ShuffleWriteMetrics writeMetrics; | ||
if (metrics != null) { | ||
writeMetrics = new WriteMetrics(metrics); | ||
} else { | ||
writeMetrics = context.taskMetrics().shuffleWriteMetrics(); | ||
} | ||
return new VeloxUniffleColumnarShuffleWriter<>( | ||
context.partitionId(), | ||
rssHandle.getAppId(), | ||
rssHandle.getShuffleId(), | ||
taskId, | ||
context.taskAttemptId(), | ||
writeMetrics, | ||
this, | ||
sparkConf, | ||
shuffleWriteClient, | ||
rssHandle, | ||
this::markFailedTask, | ||
context); | ||
} else { | ||
return super.getWriter(handle, mapId, context, metrics); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
export export
->export