From d12deaa6419c976409f23df0c69e7e10e823263a Mon Sep 17 00:00:00 2001 From: Yuan Date: Tue, 14 Jan 2025 07:35:06 +0800 Subject: [PATCH 1/7] [GLUTEN-8487][VL] adding JDK11 based Centos8 image (#8513) This patch added JDK11 based Centos 8 image --- .github/workflows/docker_image.yml | 12 ++++++++++-- dev/docker/Dockerfile.centos8-dynamic-build-jdk11 | 12 ++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) create mode 100644 dev/docker/Dockerfile.centos8-dynamic-build-jdk11 diff --git a/.github/workflows/docker_image.yml b/.github/workflows/docker_image.yml index f6f0cd51b1f5c..58b7549adb70b 100644 --- a/.github/workflows/docker_image.yml +++ b/.github/workflows/docker_image.yml @@ -69,10 +69,18 @@ jobs: username: ${{ secrets.DOCKERHUB_USER }} password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: Build and push Docker image + - name: Build and push Docker image Centos8 uses: docker/build-push-action@v2 with: context: . file: dev/docker/Dockerfile.centos8-dynamic-build push: true - tags: apache/gluten:centos-8 + tags: apache/gluten:centos-8 # JDK8 based + + - name: Build and push Docker image Centos8 + JDK11 + uses: docker/build-push-action@v2 + with: + context: . + file: dev/docker/Dockerfile.centos8-dynamic-build-jdk11 + push: true + tags: apache/gluten:centos-8-jdk11 diff --git a/dev/docker/Dockerfile.centos8-dynamic-build-jdk11 b/dev/docker/Dockerfile.centos8-dynamic-build-jdk11 new file mode 100644 index 0000000000000..0c2d8cc0c6869 --- /dev/null +++ b/dev/docker/Dockerfile.centos8-dynamic-build-jdk11 @@ -0,0 +1,12 @@ +FROM apache/gluten:centos-8 + + +RUN yum install -y java-11-openjdk-devel patch wget git perl +ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk +ENV PATH=$JAVA_HOME/bin:$PATH + +ENV PATH=${PATH}:/usr/lib/maven/bin + +RUN git clone --depth=1 https://github.com/apache/incubator-gluten /opt/gluten + +RUN cd /opt/gluten && source /opt/rh/gcc-toolset-11/enable && ./dev/builddeps-veloxbe.sh --run_setup_script=ON build_arrow && rm -rf /opt/gluten From 575536840af66229383a2f4f01f1cc1974ad8229 Mon Sep 17 00:00:00 2001 From: Gluten Performance Bot <137994563+GlutenPerfBot@users.noreply.github.com> Date: Tue, 14 Jan 2025 10:22:36 +0800 Subject: [PATCH 2/7] [GLUTEN-6887][VL] Daily Update Velox Version (2025_01_14) (#8522) Upstream Velox's New Commits: 5d547b7c9 by Krishna Pai, Reduce complexElementsMaxSize for Window Fuzzers (12074) d1ec9c014 by Ke Wang, fix: Fix config name for stats-based-filter-reorder-disabaled (12075) --- ep/build-velox/src/get_velox.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 06ce5b96068cb..b3fa3ed5d3aab 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2025_01_12 +VELOX_BRANCH=2025_01_14 VELOX_HOME="" OS=`uname -s` From ea58aab7348c97a28b545e2df4c59649fda40325 Mon Sep 17 00:00:00 2001 From: JiaKe Date: Tue, 14 Jan 2025 17:29:58 +0800 Subject: [PATCH 3/7] [GLUTEN-8020][VL] Remove the libhdfs3 installation script required for static linking (#8013) - We will only retain the dynamic libraries libhdfs.so or libhdfs3.so at runtime based on this benchmark here. So there is no need to keep the libhdfs3 installation script required for static linking. - Some customers still use libhdfs3. We provide a script to compile libhdfs3.so in the dev folder --- dev/build-thirdparty.sh | 14 +++---- dev/build_helper_functions.sh | 6 --- dev/build_libhdfs3.sh | 37 +++++++++++++++++++ dev/builddeps-veloxbe.sh | 9 ----- dev/vcpkg/init.sh | 6 --- dev/vcpkg/ports/libhdfs3/libhdfs3Config.cmake | 26 ------------- dev/vcpkg/ports/libhdfs3/portfile.cmake | 27 -------------- dev/vcpkg/ports/libhdfs3/usage | 4 -- dev/vcpkg/ports/libhdfs3/vcpkg.json | 34 ----------------- dev/vcpkg/vcpkg.json | 6 --- docs/get-started/Velox.md | 4 ++ ep/build-velox/src/setup-centos7.sh | 11 ------ 12 files changed, 48 insertions(+), 136 deletions(-) create mode 100755 dev/build_libhdfs3.sh delete mode 100644 dev/vcpkg/ports/libhdfs3/libhdfs3Config.cmake delete mode 100644 dev/vcpkg/ports/libhdfs3/portfile.cmake delete mode 100644 dev/vcpkg/ports/libhdfs3/usage delete mode 100644 dev/vcpkg/ports/libhdfs3/vcpkg.json diff --git a/dev/build-thirdparty.sh b/dev/build-thirdparty.sh index ee827ef197f7d..109392f4f4a77 100755 --- a/dev/build-thirdparty.sh +++ b/dev/build-thirdparty.sh @@ -12,40 +12,40 @@ ARCH=`uname -m` mkdir -p $THIRDPARTY_LIB function process_setup_ubuntu_2004 { cp /usr/lib/${ARCH}-linux-gnu/{libroken.so.18,libasn1.so.8,libcrypto.so.1.1,libnghttp2.so.14,libnettle.so.7,libhogweed.so.5,librtmp.so.1,libssh.so.4,libssl.so.1.1,liblber-2.4.so.2,libsasl2.so.2,libwind.so.0,libheimbase.so.1,libhcrypto.so.4,libhx509.so.5,libkrb5.so.26,libheimntlm.so.0,libgssapi.so.3,libldap_r-2.4.so.2,libcurl.so.4,libdouble-conversion.so.3,libevent-2.1.so.7,libgflags.so.2.2,libunwind.so.8,libglog.so.0,libidn.so.11,libntlm.so.0,libgsasl.so.7,libicudata.so.66,libicuuc.so.66,libxml2.so.2,libre2.so.5,libsnappy.so.1,libpsl.so.5,libbrotlidec.so.1,libbrotlicommon.so.1,libthrift-0.13.0.so} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libprotobuf.so.32,libhdfs3.so.1,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libprotobuf.so.32,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ } function process_setup_ubuntu_2204 { cp /usr/lib/${ARCH}-linux-gnu/{libre2.so.9,libdouble-conversion.so.3,libidn.so.12,libglog.so.0,libgflags.so.2.2,libevent-2.1.so.7,libsnappy.so.1,libunwind.so.8,libcurl.so.4,libxml2.so.2,libgsasl.so.7,libicui18n.so.70,libicuuc.so.70,libnghttp2.so.14,libldap-2.5.so.0,liblber-2.5.so.0,libntlm.so.0,librtmp.so.1,libsasl2.so.2,libssh.so.4,libicudata.so.70,libthrift-0.16.0.so} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libhdfs3.so.1,libprotobuf.so.32,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libprotobuf.so.32,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ } function process_setup_centos_9 { cp /lib64/{libre2.so.9,libdouble-conversion.so.3,libevent-2.1.so.7,libdwarf.so.0,libgsasl.so.7,libicudata.so.67,libicui18n.so.67,libicuuc.so.67,libidn.so.12,libntlm.so.0,libsodium.so.23} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libhdfs3.so.1,libboost_context.so.1.84.0,libboost_filesystem.so.1.84.0,libboost_program_options.so.1.84.0,libboost_regex.so.1.84.0,libboost_system.so.1.84.0,libboost_thread.so.1.84.0,libboost_atomic.so.1.84.0,libprotobuf.so.32} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libboost_context.so.1.84.0,libboost_filesystem.so.1.84.0,libboost_program_options.so.1.84.0,libboost_regex.so.1.84.0,libboost_system.so.1.84.0,libboost_thread.so.1.84.0,libboost_atomic.so.1.84.0,libprotobuf.so.32} $THIRDPARTY_LIB/ cp /usr/local/lib64/{libgflags.so.2.2,libglog.so.1} $THIRDPARTY_LIB/ } function process_setup_centos_8 { cp /usr/lib64/{libre2.so.0,libdouble-conversion.so.3,libevent-2.1.so.6,libdwarf.so.1,libgsasl.so.7,libicudata.so.60,libicui18n.so.60,libicuuc.so.60,libidn.so.11,libntlm.so.0,libsodium.so.23} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libhdfs3.so.1,libboost_context.so.1.84.0,libboost_filesystem.so.1.84.0,libboost_program_options.so.1.84.0,libboost_regex.so.1.84.0,libboost_system.so.1.84.0,libboost_thread.so.1.84.0,libboost_atomic.so.1.84.0,libprotobuf.so.32} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libboost_context.so.1.84.0,libboost_filesystem.so.1.84.0,libboost_program_options.so.1.84.0,libboost_regex.so.1.84.0,libboost_system.so.1.84.0,libboost_thread.so.1.84.0,libboost_atomic.so.1.84.0,libprotobuf.so.32} $THIRDPARTY_LIB/ cp /usr/local/lib64/{libgflags.so.2.2,libglog.so.1} $THIRDPARTY_LIB/ } function process_setup_centos_7 { cp /usr/local/lib64/{libgflags.so.2.2,libglog.so.0} $THIRDPARTY_LIB/ cp /usr/lib64/{libdouble-conversion.so.1,libevent-2.0.so.5,libzstd.so.1,libntlm.so.0,libgsasl.so.7,liblz4.so.1} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libre2.so.10,libhdfs3.so.1,libboost_context.so.1.84.0,libboost_filesystem.so.1.84.0,libboost_program_options.so.1.84.0,libboost_system.so.1.84.0,libboost_thread.so.1.84.0,libboost_regex.so.1.84.0,libboost_atomic.so.1.84.0,libprotobuf.so.32} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libre2.so.10,libboost_context.so.1.84.0,libboost_filesystem.so.1.84.0,libboost_program_options.so.1.84.0,libboost_system.so.1.84.0,libboost_thread.so.1.84.0,libboost_regex.so.1.84.0,libboost_atomic.so.1.84.0,libprotobuf.so.32} $THIRDPARTY_LIB/ } function process_setup_debian_11 { cp /usr/lib/x86_64-linux-gnu/{libre2.so.9,libthrift-0.13.0.so,libdouble-conversion.so.3,libevent-2.1.so.7,libgflags.so.2.2,libglog.so.0,libsnappy.so.1,libunwind.so.8,libcurl.so.4,libicui18n.so.67,libicuuc.so.67,libnghttp2.so.14,librtmp.so.1,libssh2.so.1,libpsl.so.5,libldap_r-2.4.so.2,liblber-2.4.so.2,libbrotlidec.so.1,libicudata.so.67,libsasl2.so.2,libbrotlicommon.so.1} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libhdfs3.so.1,libprotobuf.so.32,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libprotobuf.so.32,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ } function process_setup_debian_12 { cp /usr/lib/x86_64-linux-gnu/{libthrift-0.17.0.so,libdouble-conversion.so.3,libevent-2.1.so.7,libgflags.so.2.2,libglog.so.1,libsnappy.so.1,libunwind.so.8,libcurl.so.4,libicui18n.so.72,libicuuc.so.72,libnghttp2.so.14,librtmp.so.1,libssh2.so.1,libpsl.so.5,libldap-2.5.so.0,liblber-2.5.so.0,libbrotlidec.so.1,libicudata.so.72,libsasl2.so.2,libbrotlicommon.so.1,libcrypto.so.3,libssl.so.3,libgssapi_krb5.so.2,libkrb5.so.3,libk5crypto.so.3,libkrb5support.so.0,libkeyutils.so.1} $THIRDPARTY_LIB/ - cp /usr/local/lib/{libprotobuf.so.32,libhdfs3.so.1,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ + cp /usr/local/lib/{libprotobuf.so.32,libboost_context.so.1.84.0,libboost_regex.so.1.84.0} $THIRDPARTY_LIB/ } if [[ "$LINUX_OS" == "ubuntu" || "$LINUX_OS" == "pop" ]]; then diff --git a/dev/build_helper_functions.sh b/dev/build_helper_functions.sh index 97e3a09935206..836ed6ca4e551 100644 --- a/dev/build_helper_functions.sh +++ b/dev/build_helper_functions.sh @@ -208,9 +208,3 @@ function setup_linux { exit 1 fi } - -function install_libhdfs3 { - github_checkout oap-project/libhdfs3 master - cmake_install -} - diff --git a/dev/build_libhdfs3.sh b/dev/build_libhdfs3.sh new file mode 100755 index 0000000000000..b001a121c69b7 --- /dev/null +++ b/dev/build_libhdfs3.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -exu + +CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) +export SUDO=sudo +source ${CURRENT_DIR}/build_helper_functions.sh +DEPENDENCY_DIR=${DEPENDENCY_DIR:-$CURRENT_DIR/../ep/_ep} + +function build_libhdfs3 { + cd "${DEPENDENCY_DIR}" + github_checkout apache/hawq master + cd depends/libhdfs3 + sed -i "/FIND_PACKAGE(GoogleTest REQUIRED)/d" ./CMakeLists.txt + sed -i "s/dumpversion/dumpfullversion/" ./CMake/Platform.cmake + sed -i "s/dfs.domain.socket.path\", \"\"/dfs.domain.socket.path\", \"\/var\/lib\/hadoop-hdfs\/dn_socket\"/g" src/common/SessionConfig.cpp + sed -i "s/pos < endOfCurBlock/pos \< endOfCurBlock \&\& pos \- cursor \<\= 128 \* 1024/g" src/client/InputStreamImpl.cpp + cmake_install +} + +echo "Start to build Libhdfs3" +build_libhdfs3 +echo "Finished building Libhdfs3. You can find the libhdfs3.so in /usr/local/lib/libhdfs3.so.1" diff --git a/dev/builddeps-veloxbe.sh b/dev/builddeps-veloxbe.sh index 8eb4cf4edf2ce..2180db9f8dca4 100755 --- a/dev/builddeps-veloxbe.sh +++ b/dev/builddeps-veloxbe.sh @@ -247,15 +247,6 @@ if [ -z "${GLUTEN_VCPKG_ENABLED:-}" ] && [ $RUN_SETUP_SCRIPT == "ON" ]; then fi ${VELOX_HOME}/scripts/setup-adapters.sh aws fi - if [ $ENABLE_HDFS == "ON" ]; then - if [ $OS == 'Darwin' ]; then - echo "HDFS is not supported on MacOS." - exit 1 - fi - pushd $VELOX_HOME - install_libhdfs3 - popd - fi if [ $ENABLE_GCS == "ON" ]; then ${VELOX_HOME}/scripts/setup-adapters.sh gcs fi diff --git a/dev/vcpkg/init.sh b/dev/vcpkg/init.sh index bae1a8ad32eb0..4e7b16af821c4 100755 --- a/dev/vcpkg/init.sh +++ b/dev/vcpkg/init.sh @@ -70,9 +70,6 @@ fi if [ "$ENABLE_GCS" = "ON" ]; then EXTRA_FEATURES+="--x-feature=velox-gcs " fi -if [ "$ENABLE_HDFS" = "ON" ]; then - EXTRA_FEATURES+="--x-feature=velox-hdfs " -fi if [ "$ENABLE_ABFS" = "ON" ]; then EXTRA_FEATURES+="--x-feature=velox-abfs" fi @@ -90,6 +87,3 @@ cp $VCPKG_TRIPLET_INSTALL_DIR/lib/libssl.a $VCPKG_TRIPLET_INSTALL_DIR/debug/lib cp $VCPKG_TRIPLET_INSTALL_DIR/lib/libcrypto.a $VCPKG_TRIPLET_INSTALL_DIR/debug/lib cp $VCPKG_TRIPLET_INSTALL_DIR/lib/liblzma.a $VCPKG_TRIPLET_INSTALL_DIR/debug/lib cp $VCPKG_TRIPLET_INSTALL_DIR/lib/libdwarf.a $VCPKG_TRIPLET_INSTALL_DIR/debug/lib -# Allow libhdfs3.a is not installed as build option may not enable hdfs. -cp $VCPKG_TRIPLET_INSTALL_DIR/lib/libhdfs3.a $VCPKG_TRIPLET_INSTALL_DIR/debug/lib || true - diff --git a/dev/vcpkg/ports/libhdfs3/libhdfs3Config.cmake b/dev/vcpkg/ports/libhdfs3/libhdfs3Config.cmake deleted file mode 100644 index 93ef72da56b23..0000000000000 --- a/dev/vcpkg/ports/libhdfs3/libhdfs3Config.cmake +++ /dev/null @@ -1,26 +0,0 @@ -include(CMakeFindDependencyMacro) -include(FindPkgConfig) - -find_dependency(Boost COMPONENTS thread chrono system atomic iostreams) -find_dependency(LibXml2) -find_dependency(Protobuf) -pkg_check_modules(Gsasl REQUIRED libgsasl mit-krb5-gssapi) -pkg_check_modules(UUID REQUIRED uuid) - -FUNCTION(SET_LIBRARY_TARGET NAMESPACE LIB_NAME DEBUG_LIB_FILE_NAME RELEASE_LIB_FILE_NAME INCLUDE_DIR) - ADD_LIBRARY(${NAMESPACE}::${LIB_NAME} STATIC IMPORTED) - SET_TARGET_PROPERTIES(${NAMESPACE}::${LIB_NAME} PROPERTIES - IMPORTED_CONFIGURATIONS "RELEASE;DEBUG" - IMPORTED_LOCATION_RELEASE "${RELEASE_LIB_FILE_NAME}" - IMPORTED_LOCATION_DEBUG "${DEBUG_LIB_FILE_NAME}" - INTERFACE_INCLUDE_DIRECTORIES "${INCLUDE_DIR}" - INTERFACE_LINK_LIBRARIES "protobuf::libprotobuf;LibXml2::LibXml2;${Gsasl_LINK_LIBRARIES};${UUID_LINK_LIBRARIES}" - ) - SET(${NAMESPACE}_${LIB_NAME}_FOUND 1) -ENDFUNCTION() - -GET_FILENAME_COMPONENT(ROOT "${CMAKE_CURRENT_LIST_FILE}" PATH) -GET_FILENAME_COMPONENT(ROOT "${ROOT}" PATH) -GET_FILENAME_COMPONENT(ROOT "${ROOT}" PATH) - -SET_LIBRARY_TARGET("HDFS" "hdfs3" "${ROOT}/debug/lib/libhdfs3.a" "${ROOT}/lib/libhdfs3.a" "${ROOT}/include/hdfs") \ No newline at end of file diff --git a/dev/vcpkg/ports/libhdfs3/portfile.cmake b/dev/vcpkg/ports/libhdfs3/portfile.cmake deleted file mode 100644 index 4c59c57ef9a8c..0000000000000 --- a/dev/vcpkg/ports/libhdfs3/portfile.cmake +++ /dev/null @@ -1,27 +0,0 @@ -vcpkg_from_github( - OUT_SOURCE_PATH SOURCE_PATH - REPO oap-project/libhdfs3 - HEAD_REF master - REF 9f234edb354ebcc99179cc6f72aefd66865f4154 - SHA512 a1a587fdca60a39f77d36b281ad15fefd7cb4b353c982274ef3d7702e84c834525cd5a3ec2bbc4154fce58f1c7054a17789f08485eaacfbb672544398a277951 -) - -vcpkg_configure_cmake( - SOURCE_PATH ${SOURCE_PATH} - PREFER_NINJA - OPTIONS - -DCMAKE_PROGRAM_PATH=${CURRENT_HOST_INSTALLED_DIR}/tools/yasm - -DWITH_KERBEROS=on -) - -vcpkg_install_cmake() - -vcpkg_copy_pdbs() - -file(GLOB HDFS3_SHARED_LIBS ${CURRENT_PACKAGES_DIR}/debug/lib/libhdfs3.so* ${CURRENT_PACKAGES_DIR}/lib/libhdfs3.so*) -file(REMOVE ${HDFS3_SHARED_LIBS}) - -file(REMOVE_RECURSE ${CURRENT_PACKAGES_DIR}/debug/include ${CURRENT_PACKAGES_DIR}/debug/share) -file(INSTALL ${SOURCE_PATH}/LICENSE.txt DESTINATION ${CURRENT_PACKAGES_DIR}/share/${PORT} RENAME copyright) -FILE(INSTALL ${CMAKE_CURRENT_LIST_DIR}/libhdfs3Config.cmake DESTINATION ${CURRENT_PACKAGES_DIR}/share/${PORT}) -FILE(INSTALL ${CMAKE_CURRENT_LIST_DIR}/usage DESTINATION ${CURRENT_PACKAGES_DIR}/share/${PORT}) diff --git a/dev/vcpkg/ports/libhdfs3/usage b/dev/vcpkg/ports/libhdfs3/usage deleted file mode 100644 index 780d82d25683f..0000000000000 --- a/dev/vcpkg/ports/libhdfs3/usage +++ /dev/null @@ -1,4 +0,0 @@ -The package libhdfs3 is compatible with built-in CMake targets: - - FIND_PACKAGE(libhdfs3 REQUIRED) - TARGET_LINK_LIBRARIES(main PRIVATE HDFS::hdfs3) diff --git a/dev/vcpkg/ports/libhdfs3/vcpkg.json b/dev/vcpkg/ports/libhdfs3/vcpkg.json deleted file mode 100644 index 495615cf90773..0000000000000 --- a/dev/vcpkg/ports/libhdfs3/vcpkg.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "name": "libhdfs3", - "version-date": "2019-11-05", - "port-version": 3, - "description": "Native Hadoop RPC protocol and HDFS data transfer protocol implementation", - "homepage": "https://github.com/erikmuttersbach/libhdfs3", - "supports": "!windows", - "dependencies": [ - "boost-thread", - "boost-chrono", - "boost-system", - "boost-atomic", - "boost-iostreams", - "boost-function", - "boost-bind", - "boost-crc", - "boost-functional", - { - "name": "libuuid", - "platform": "!windows & !osx" - }, - "libxml2", - "protobuf", - "krb5", - "gsasl", - { - "name": "yasm", - "host": true, - "features": [ - "tools" - ] - } - ] -} diff --git a/dev/vcpkg/vcpkg.json b/dev/vcpkg/vcpkg.json index c0123cfbe9980..a6a70ec913bff 100644 --- a/dev/vcpkg/vcpkg.json +++ b/dev/vcpkg/vcpkg.json @@ -80,12 +80,6 @@ } ] }, - "velox-hdfs": { - "description": "Velox HDFS Support", - "dependencies": [ - "libhdfs3" - ] - }, "velox-abfs": { "description": "Velox ABFS Support", "dependencies": [ diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md index 81bb88c75aece..8c8cb7bbd8186 100644 --- a/docs/get-started/Velox.md +++ b/docs/get-started/Velox.md @@ -127,6 +127,10 @@ libraries list from the third-party jar. Gluten supports dynamically loading both libhdfs.so and libhdfs3.so at runtime by using dlopen, allowing the JVM to load the appropriate shared library file as needed. This means you do not need to set the library path during the compilation phase. To enable this functionality, you must set the JAVA_HOME and HADOOP_HOME environment variables. Gluten will then locate and load the ${HADOOP_HOME}/lib/native/libhdfs.so file at runtime. If you prefer to use libhdfs3.so instead, simply replace the ${HADOOP_HOME}/lib/native/libhdfs.so file with libhdfs3.so. +### Build libhdfs3 + +If you want to run Gluten with libhdfs3.so, you need to manually compile libhdfs3 to obtain the libhdfs3.so file. We provide the script dev/build_libhdfs3.sh in Gluten to help you compile libhdfs3.so. + ### Build with HDFS support To build Gluten with HDFS support, below command is suggested: diff --git a/ep/build-velox/src/setup-centos7.sh b/ep/build-velox/src/setup-centos7.sh index 45880161a4a58..dbac575fbb473 100755 --- a/ep/build-velox/src/setup-centos7.sh +++ b/ep/build-velox/src/setup-centos7.sh @@ -166,17 +166,6 @@ function install_boost { $SUDO ./b2 "-j$(nproc)" -d0 install threading=multi } -function install_libhdfs3 { - cd "${DEPENDENCY_DIR}" - github_checkout apache/hawq master - cd depends/libhdfs3 - sed -i "/FIND_PACKAGE(GoogleTest REQUIRED)/d" ./CMakeLists.txt - sed -i "s/dumpversion/dumpfullversion/" ./CMake/Platform.cmake - sed -i "s/dfs.domain.socket.path\", \"\"/dfs.domain.socket.path\", \"\/var\/lib\/hadoop-hdfs\/dn_socket\"/g" src/common/SessionConfig.cpp - sed -i "s/pos < endOfCurBlock/pos \< endOfCurBlock \&\& pos \- cursor \<\= 128 \* 1024/g" src/client/InputStreamImpl.cpp - cmake_install -} - function install_protobuf { cd "${DEPENDENCY_DIR}" wget https://github.com/protocolbuffers/protobuf/releases/download/v21.4/protobuf-all-21.4.tar.gz From ed6cdb34e45b957b63c38531fd709394fc308c33 Mon Sep 17 00:00:00 2001 From: jkhaliqi <60749291+jkhaliqi@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:46:11 -0800 Subject: [PATCH 4/7] [GLUTEN-8532][VL] Fix parenthesis within macro (#8533) Use parentheses within macros around parameter names --- cpp/core/utils/qat/QatCodec.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/core/utils/qat/QatCodec.cc b/cpp/core/utils/qat/QatCodec.cc index 1a5fc8ea6db1b..e01fa96988bde 100644 --- a/cpp/core/utils/qat/QatCodec.cc +++ b/cpp/core/utils/qat/QatCodec.cc @@ -26,9 +26,9 @@ #include "QatCodec.h" -#define QZ_INIT_FAIL(rc) (QZ_OK != rc && QZ_DUPLICATE != rc) +#define QZ_INIT_FAIL(rc) ((QZ_OK != (rc)) && (QZ_DUPLICATE != (rc))) -#define QZ_SETUP_SESSION_FAIL(rc) (QZ_PARAMS == rc || QZ_NOSW_NO_HW == rc || QZ_NOSW_LOW_MEM == rc) +#define QZ_SETUP_SESSION_FAIL(rc) (QZ_PARAMS == (rc) || QZ_NOSW_NO_HW == (rc) || QZ_NOSW_LOW_MEM == (rc)) namespace gluten { namespace qat { From 9171124483eb4935cace5979d1373d4c5f78ad1b Mon Sep 17 00:00:00 2001 From: Gluten Performance Bot <137994563+GlutenPerfBot@users.noreply.github.com> Date: Wed, 15 Jan 2025 09:05:38 +0800 Subject: [PATCH 5/7] [GLUTEN-6887][VL] Daily Update Velox Version (2025_01_15) (#8536) Upstream Velox's New Commits: 88170133e by rui-mo, feat(fuzzer): Add custom special form signatures for Presto and Spark (12032) 61e737c59 by Pedro Eugenio Rocha Pedreira, fix(plan-builder): Avoid advancing plan id on error (12081) 93523f823 by Deepak Majeti, feat(s3): Support S3 Region (12063) 50f9a5455 by Kevin Wilfong, refactor: Break PrestoSerializer.cpp into components (11922) 2b74a93bc by Pramod Satya, feat: Increase peer rows in partitions generated by window fuzzer (10293) --- ep/build-velox/src/get_velox.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index b3fa3ed5d3aab..f68c9b9980b73 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -17,7 +17,7 @@ set -exu VELOX_REPO=https://github.com/oap-project/velox.git -VELOX_BRANCH=2025_01_14 +VELOX_BRANCH=2025_01_15 VELOX_HOME="" OS=`uname -s` From 0b5a46a23a5a404eb466f40b6bbacb057c7b67a7 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Wed, 15 Jan 2025 10:12:49 +0800 Subject: [PATCH 6/7] [CORE] Use RAS's cost model for legacy transition planner to evaluate cost of transitions (#8527) --- .../clickhouse/CHListenerApi.scala | 4 +- .../backendsapi/clickhouse/CHRuleApi.scala | 4 + .../backendsapi/velox/VeloxListenerApi.scala | 6 +- .../backendsapi/velox/VeloxRuleApi.scala | 11 +- .../execution/VeloxRoughCostModel2Suite.scala | 65 ----------- .../enumerated/planner/VeloxRasSuite.scala | 25 +++-- .../transition/VeloxTransitionSuite.scala | 4 +- .../enumerated/EnumeratedTransform.scala | 4 +- .../planner/cost/GlutenCostModel.scala | 30 +++++ .../planner/cost/LongCostModel.scala | 42 ++++--- .../planner/cost/LongCosterChain.scala | 2 +- .../columnar/transition/Convention.scala | 32 +++--- .../columnar/transition/ConventionFunc.scala | 22 ---- .../transition/FloydWarshallGraph.scala | 43 +++++--- .../columnar/transition/TransitionGraph.scala | 104 +++++++++++++----- .../extension/injector/GlutenInjector.scala | 7 +- .../transition/FloydWarshallGraphSuite.scala | 13 ++- .../planner/cost/LegacyCoster.scala | 4 +- .../enumerated/planner/cost/RoughCoster.scala | 2 - .../planner/cost/RoughCoster2.scala | 83 -------------- .../columnar/transition/TransitionSuite.scala | 47 +++++--- .../apache/gluten/config/GlutenConfig.scala | 32 +----- 22 files changed, 255 insertions(+), 331 deletions(-) delete mode 100644 backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala create mode 100644 gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/GlutenCostModel.scala delete mode 100644 gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index fbaf9e37c15f4..48ef66ca74a8f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -23,6 +23,7 @@ import org.apache.gluten.execution.CHBroadcastBuildSideCache import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.UDFMappings import org.apache.gluten.extension.ExpressionExtensionTrait +import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.jni.JniLibLoader import org.apache.gluten.vectorized.CHNativeExpressionEvaluator @@ -70,7 +71,8 @@ class CHListenerApi extends ListenerApi with Logging { override def onExecutorShutdown(): Unit = shutdown() private def initialize(conf: SparkConf, isDriver: Boolean): Unit = { - // Force batch type initializations. + // Do row / batch type initializations. + Convention.ensureSparkRowAndBatchTypesRegistered() CHBatch.ensureRegistered() SparkDirectoryUtil.init(conf) val libPath = conf.get(GlutenConfig.GLUTEN_LIB_PATH, StringUtils.EMPTY) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 21ae342a22637..426c88c9073f2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -22,6 +22,7 @@ import org.apache.gluten.config.GlutenConfig import org.apache.gluten.extension._ import org.apache.gluten.extension.columnar._ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast} +import org.apache.gluten.extension.columnar.enumerated.planner.cost.LegacyCoster import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, HeuristicTransform} import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers} import org.apache.gluten.extension.columnar.rewrite._ @@ -142,6 +143,9 @@ object CHRuleApi { } private def injectRas(injector: RasInjector): Unit = { + // Register legacy coster for transition planner. + injector.injectCoster(_ => LegacyCoster) + // CH backend doesn't work with RAS at the moment. Inject a rule that aborts any // execution calls. injector.injectPreTransform( diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 5d75521b8473e..0453558d1af7a 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -22,6 +22,7 @@ import org.apache.gluten.columnarbatch.VeloxBatch import org.apache.gluten.config.GlutenConfig import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.expression.UDFMappings +import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.init.NativeBackendInitializer import org.apache.gluten.jni.{JniLibLoader, JniWorkspace} import org.apache.gluten.udf.UdfJniWrapper @@ -126,10 +127,11 @@ class VeloxListenerApi extends ListenerApi with Logging { override def onExecutorShutdown(): Unit = shutdown() private def initialize(conf: SparkConf, isDriver: Boolean): Unit = { - // Force batch type initializations. - VeloxBatch.ensureRegistered() + // Do row / batch type initializations. + Convention.ensureSparkRowAndBatchTypesRegistered() ArrowJavaBatch.ensureRegistered() ArrowNativeBatch.ensureRegistered() + VeloxBatch.ensureRegistered() // Register columnar shuffle so can be considered when // `org.apache.spark.shuffle.GlutenShuffleManager` is set as Spark shuffle manager. diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index f3c75cd983187..6c60ab7d537f8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -23,7 +23,7 @@ import org.apache.gluten.extension._ import org.apache.gluten.extension.columnar._ import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast} import org.apache.gluten.extension.columnar.enumerated.{RasOffload, RemoveSort} -import org.apache.gluten.extension.columnar.enumerated.planner.cost.{LegacyCoster, RoughCoster, RoughCoster2} +import org.apache.gluten.extension.columnar.enumerated.planner.cost.{LegacyCoster, RoughCoster} import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, HeuristicTransform} import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers} import org.apache.gluten.extension.columnar.rewrite._ @@ -120,6 +120,10 @@ object VeloxRuleApi { } private def injectRas(injector: RasInjector): Unit = { + // Gluten RAS: Costers. + injector.injectCoster(_ => LegacyCoster) + injector.injectCoster(_ => RoughCoster) + // Gluten RAS: Pre rules. injector.injectPreTransform(_ => RemoveTransitions) injector.injectPreTransform(_ => PushDownInputFileExpression.PreOffload) @@ -131,6 +135,7 @@ object VeloxRuleApi { // Gluten RAS: The RAS rule. val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf) + injector.injectRasRule(_ => RemoveSort) val rewrites = Seq( RewriteIn, @@ -139,10 +144,6 @@ object VeloxRuleApi { PullOutPreProject, PullOutPostProject, ProjectColumnPruning) - injector.injectCoster(_ => LegacyCoster) - injector.injectCoster(_ => RoughCoster) - injector.injectCoster(_ => RoughCoster2) - injector.injectRasRule(_ => RemoveSort) val offloads: Seq[RasOffload] = Seq( RasOffload.from[Exchange](OffloadExchange()), RasOffload.from[BaseJoinExec](OffloadJoin()), diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala deleted file mode 100644 index cf61a7323665c..0000000000000 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.execution - -import org.apache.gluten.config.GlutenConfig - -import org.apache.spark.SparkConf -import org.apache.spark.sql.execution.ProjectExec - -class VeloxRoughCostModel2Suite extends VeloxWholeStageTransformerSuite { - override protected val resourcePath: String = "/tpch-data-parquet-velox" - override protected val fileFormat: String = "parquet" - - override def beforeAll(): Unit = { - super.beforeAll() - spark - .range(100) - .selectExpr("cast(id % 3 as int) as c1", "id as c2", "array(id, id + 1) as c3") - .write - .format("parquet") - .saveAsTable("tmp1") - } - - override protected def afterAll(): Unit = { - spark.sql("drop table tmp1") - super.afterAll() - } - - override protected def sparkConf: SparkConf = super.sparkConf - .set(GlutenConfig.RAS_ENABLED.key, "true") - .set(GlutenConfig.RAS_COST_MODEL.key, "rough2") - .set(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key, "false") - - test("fallback trivial project if its neighbor nodes fell back") { - withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") { - runQueryAndCompare("select c1 as c3 from tmp1") { - checkSparkOperatorMatch[ProjectExec] - } - } - } - - test("avoid adding r2c if r2c cost greater than native") { - withSQLConf( - GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false", - GlutenConfig.RAS_ROUGH2_SIZEBYTES_THRESHOLD.key -> "1") { - runQueryAndCompare("select array_contains(c3, 0) as list from tmp1") { - checkSparkOperatorMatch[ProjectExec] - } - } - } -} diff --git a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala index 65d32ebf61626..e7de629b39e30 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala @@ -18,10 +18,10 @@ package org.apache.gluten.extension.columnar.enumerated.planner import org.apache.gluten.config.GlutenConfig import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform -import org.apache.gluten.extension.columnar.enumerated.planner.cost.{LegacyCoster, LongCostModel} +import org.apache.gluten.extension.columnar.enumerated.planner.cost.{GlutenCostModel, LegacyCoster, LongCostModel} import org.apache.gluten.extension.columnar.enumerated.planner.property.Conv -import org.apache.gluten.extension.columnar.transition.ConventionReq -import org.apache.gluten.ras.{Cost, CostModel, Ras} +import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq} +import org.apache.gluten.ras.{Cost, Ras} import org.apache.gluten.ras.RasSuiteBase._ import org.apache.gluten.ras.path.RasPath import org.apache.gluten.ras.property.PropertySet @@ -37,6 +37,11 @@ import org.apache.spark.sql.types.StringType class VeloxRasSuite extends SharedSparkSession { import VeloxRasSuite._ + override protected def beforeAll(): Unit = { + super.beforeAll() + Convention.ensureSparkRowAndBatchTypesRegistered() + } + test("C2R, R2C - basic") { val in = RowUnary(RowLeaf(TRIVIAL_SCHEMA)) val planner = newRas().newPlanner(in) @@ -153,14 +158,14 @@ object VeloxRasSuite { .asInstanceOf[Ras[SparkPlan]] } - private def legacyCostModel(): CostModel[SparkPlan] = { + private def legacyCostModel(): GlutenCostModel = { val registry = LongCostModel.registry() val coster = LegacyCoster registry.register(coster) registry.get(coster.kind()) } - private def sessionCostModel(): CostModel[SparkPlan] = { + private def sessionCostModel(): GlutenCostModel = { val transform = EnumeratedTransform.static() transform.costModel } @@ -198,7 +203,7 @@ object VeloxRasSuite { override def shape(): Shape[SparkPlan] = Shapes.fixedHeight(1) } - class UserCostModel1 extends CostModel[SparkPlan] { + class UserCostModel1 extends GlutenCostModel { private val base = legacyCostModel() override def costOf(node: SparkPlan): Cost = node match { case _: RowUnary => base.makeInfCost() @@ -206,9 +211,12 @@ object VeloxRasSuite { } override def costComparator(): Ordering[Cost] = base.costComparator() override def makeInfCost(): Cost = base.makeInfCost() + override def sum(one: Cost, other: Cost): Cost = base.sum(one, other) + override def diff(one: Cost, other: Cost): Cost = base.diff(one, other) + override def makeZeroCost(): Cost = base.makeZeroCost() } - class UserCostModel2 extends CostModel[SparkPlan] { + class UserCostModel2 extends GlutenCostModel { private val base = legacyCostModel() override def costOf(node: SparkPlan): Cost = node match { case _: ColumnarUnary => base.makeInfCost() @@ -216,5 +224,8 @@ object VeloxRasSuite { } override def costComparator(): Ordering[Cost] = base.costComparator() override def makeInfCost(): Cost = base.makeInfCost() + override def sum(one: Cost, other: Cost): Cost = base.sum(one, other) + override def diff(one: Cost, other: Cost): Cost = base.diff(one, other) + override def makeZeroCost(): Cost = base.makeZeroCost() } } diff --git a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala index e14ffd43d82d0..335844782a44d 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala @@ -200,13 +200,13 @@ class VeloxTransitionSuite extends SharedSparkSession { } override protected def beforeAll(): Unit = { - api.onExecutorStart(MockVeloxBackend.mockPluginContext()) super.beforeAll() + api.onExecutorStart(MockVeloxBackend.mockPluginContext()) } override protected def afterAll(): Unit = { - super.afterAll() api.onExecutorShutdown() + super.afterAll() } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala index 34b4005a756d3..59e829e179369 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala @@ -20,11 +20,11 @@ import org.apache.gluten.component.Component import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall import org.apache.gluten.extension.columnar.enumerated.planner.GlutenOptimization +import org.apache.gluten.extension.columnar.enumerated.planner.cost.GlutenCostModel import org.apache.gluten.extension.columnar.enumerated.planner.property.Conv import org.apache.gluten.extension.injector.Injector import org.apache.gluten.extension.util.AdaptiveContext import org.apache.gluten.logging.LogLevelUtil -import org.apache.gluten.ras.CostModel import org.apache.gluten.ras.property.PropertySet import org.apache.gluten.ras.rule.RasRule @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution._ * * The feature requires enabling RAS to function. */ -case class EnumeratedTransform(costModel: CostModel[SparkPlan], rules: Seq[RasRule[SparkPlan]]) +case class EnumeratedTransform(costModel: GlutenCostModel, rules: Seq[RasRule[SparkPlan]]) extends Rule[SparkPlan] with LogLevelUtil { diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/GlutenCostModel.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/GlutenCostModel.scala new file mode 100644 index 0000000000000..41e5529d2eba8 --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/GlutenCostModel.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar.enumerated.planner.cost + +import org.apache.gluten.ras.{Cost, CostModel} + +import org.apache.spark.sql.execution.SparkPlan + +trait GlutenCostModel extends CostModel[SparkPlan] { + // Returns cost value of one + other. + def sum(one: Cost, other: Cost): Cost + // Returns cost value of one - other. + def diff(one: Cost, other: Cost): Cost + + def makeZeroCost(): Cost +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala index 393ac35de42f0..0d11541b73ddd 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala @@ -18,34 +18,46 @@ package org.apache.gluten.extension.columnar.enumerated.planner.cost import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.columnar.enumerated.planner.plan.GlutenPlanModel.GroupLeafExec -import org.apache.gluten.ras.{Cost, CostModel} +import org.apache.gluten.ras.Cost import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlan import scala.collection.mutable -abstract class LongCostModel extends CostModel[SparkPlan] { +abstract class LongCostModel extends GlutenCostModel { private val infLongCost = Long.MaxValue + private val zeroLongCost = 0 override def costOf(node: SparkPlan): LongCost = node match { case _: GroupLeafExec => throw new IllegalStateException() case _ => LongCost(longCostOf(node)) } + // Sum with ceil to avoid overflow. + private def safeSum(a: Long, b: Long): Long = { + assert(a >= 0) + assert(b >= 0) + val sum = a + b + if (sum < a || sum < b) Long.MaxValue else sum + } + + override def sum(one: Cost, other: Cost): LongCost = (one, other) match { + case (LongCost(value), LongCost(otherValue)) => LongCost(safeSum(value, otherValue)) + } + + // Returns cost value of one - other. + override def diff(one: Cost, other: Cost): Cost = (one, other) match { + case (LongCost(value), LongCost(otherValue)) => + val d = Math.subtractExact(value, otherValue) + require(d >= zeroLongCost, s"Difference between cost $one and $other should not be negative") + LongCost(d) + } + private def longCostOf(node: SparkPlan): Long = node match { case n => val selfCost = selfLongCostOf(n) - - // Sum with ceil to avoid overflow. - def safeSum(a: Long, b: Long): Long = { - assert(a >= 0) - assert(b >= 0) - val sum = a + b - if (sum < a || sum < b) Long.MaxValue else sum - } - - (n.children.map(longCostOf).toList :+ selfCost).reduce(safeSum) + (n.children.map(longCostOf).toSeq :+ selfCost).reduce[Long](safeSum) } def selfLongCostOf(node: SparkPlan): Long @@ -56,6 +68,7 @@ abstract class LongCostModel extends CostModel[SparkPlan] { } override def makeInfCost(): Cost = LongCost(infLongCost) + override def makeZeroCost(): Cost = LongCost(zeroLongCost) } object LongCostModel extends Logging { @@ -98,11 +111,6 @@ object LongCostModel extends Logging { override def name(): String = "rough" } - /** Compared with rough, rough2 can be more precise to avoid the costly r2c. */ - case object Rough2 extends Kind { - override def name(): String = "rough2" - } - class Registry private[LongCostModel] { private val lookup: mutable.Map[Kind, LongCosterChain.Builder] = mutable.Map() diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCosterChain.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCosterChain.scala index 8b0c8b9f2d8a9..00980e7712a40 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCosterChain.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCosterChain.scala @@ -37,7 +37,7 @@ private class LongCosterChain private (costers: Seq[LongCoster]) extends LongCos case (c @ Some(_), _) => c } - .getOrElse(throw new GlutenException(s"Cost node found for node: $node")) + .getOrElse(throw new GlutenException(s"Cost not found for node: $node")) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala index 0e53875596743..ff0f295852990 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala @@ -19,8 +19,6 @@ package org.apache.gluten.extension.columnar.transition import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan} import org.apache.spark.util.SparkVersionUtil -import java.util.concurrent.atomic.AtomicBoolean - import scala.collection.mutable /** @@ -33,6 +31,13 @@ sealed trait Convention { } object Convention { + def ensureSparkRowAndBatchTypesRegistered(): Unit = { + RowType.None.ensureRegistered() + RowType.VanillaRow.ensureRegistered() + BatchType.None.ensureRegistered() + BatchType.VanillaBatch.ensureRegistered() + } + implicit class ConventionOps(val conv: Convention) extends AnyVal { def isNone: Boolean = { conv.rowType == RowType.None && conv.batchType == BatchType.None @@ -80,10 +85,17 @@ object Convention { } sealed trait RowType extends TransitionGraph.Vertex with Serializable { - Transition.graph.addVertex(this) + import RowType._ + + final protected[this] def register0(): Unit = BatchType.synchronized { + assert(all.add(this)) + } } object RowType { + private val all: mutable.Set[RowType] = mutable.Set() + def values(): Set[RowType] = all.toSet + // None indicates that the plan doesn't support row-based processing. final case object None extends RowType final case object VanillaRow extends RowType @@ -91,24 +103,12 @@ object Convention { trait BatchType extends TransitionGraph.Vertex with Serializable { import BatchType._ - private val initialized: AtomicBoolean = new AtomicBoolean(false) - final def ensureRegistered(): Unit = { - if (!initialized.compareAndSet(false, true)) { - // Already registered. - return - } - register() - } - - final private def register(): Unit = BatchType.synchronized { + final protected[this] def register0(): Unit = BatchType.synchronized { assert(all.add(this)) - Transition.graph.addVertex(this) registerTransitions() } - ensureRegistered() - /** * User batch type could override this method to define transitions from/to this batch type by * calling the subsequent protected APIs. diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala index c4405aeb8d0ac..3105713d989d8 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala @@ -24,7 +24,6 @@ import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec} import org.apache.spark.sql.execution.adaptive.QueryStageExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec -import org.apache.spark.util.SparkTestUtil /** ConventionFunc is a utility to derive [[Convention]] or [[ConventionReq]] from a query plan. */ sealed trait ConventionFunc { @@ -43,33 +42,12 @@ object ConventionFunc { object Empty extends Override } - // For testing, to make things work without a backend loaded. - private var ignoreBackend: Boolean = false - - // Visible for testing. - def ignoreBackend[T](body: => T): T = synchronized { - assert(SparkTestUtil.isTesting) - assert(!ignoreBackend) - ignoreBackend = true - try { - body - } finally { - ignoreBackend = false - } - } - def create(): ConventionFunc = { val batchOverride = newOverride() new BuiltinFunc(batchOverride) } private def newOverride(): Override = { - synchronized { - if (ignoreBackend) { - // For testing - return Override.Empty - } - } // Components should override Backend's convention function. Hence, reversed injection order // is applied. val overrides = Component.sorted().reverse.map(_.convFuncOverride()) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraph.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraph.scala index 2a4e1f4225177..b05e939687118 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraph.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraph.scala @@ -30,31 +30,32 @@ trait FloydWarshallGraph[V <: AnyRef, E <: AnyRef] { } object FloydWarshallGraph { - trait Cost { - def +(other: Cost): Cost - } + trait Cost trait CostModel[E <: AnyRef] { def zero(): Cost + def sum(one: Cost, other: Cost): Cost def costOf(edge: E): Cost def costComparator(): Ordering[Cost] } trait Path[E <: AnyRef] { def edges(): Seq[E] - def cost(): Cost + def cost(costModel: CostModel[E]): Cost } - def builder[V <: AnyRef, E <: AnyRef](costModel: CostModel[E]): Builder[V, E] = { - Builder.create(costModel) + def builder[V <: AnyRef, E <: AnyRef](costModelFactory: () => CostModel[E]): Builder[V, E] = { + Builder.create(costModelFactory) } private object Path { - def apply[E <: AnyRef](costModel: CostModel[E], edges: Seq[E]): Path[E] = Impl(edges)(costModel) - private case class Impl[E <: AnyRef](override val edges: Seq[E])(costModel: CostModel[E]) - extends Path[E] { - override val cost: Cost = { - edges.map(costModel.costOf).reduceOption(_ + _).getOrElse(costModel.zero()) + def apply[E <: AnyRef](edges: Seq[E]): Path[E] = Impl(edges) + private case class Impl[E <: AnyRef](override val edges: Seq[E]) extends Path[E] { + override def cost(costModel: CostModel[E]): Cost = { + edges + .map(costModel.costOf) + .reduceOption((c1, c2) => costModel.sum(c1, c2)) + .getOrElse(costModel.zero()) } } } @@ -87,13 +88,14 @@ object FloydWarshallGraph { private object Builder { // Thread safe. - private class Impl[V <: AnyRef, E <: AnyRef](costModel: CostModel[E]) extends Builder[V, E] { + private class Impl[V <: AnyRef, E <: AnyRef](costModelFactory: () => CostModel[E]) + extends Builder[V, E] { private val pathTable: mutable.Map[V, mutable.Map[V, Path[E]]] = mutable.Map() private var graph: Option[FloydWarshallGraph[V, E]] = None override def addVertex(v: V): Builder[V, E] = synchronized { assert(!pathTable.contains(v), s"Vertex $v already exists in graph") - pathTable.getOrElseUpdate(v, mutable.Map()).getOrElseUpdate(v, Path(costModel, Nil)) + pathTable.getOrElseUpdate(v, mutable.Map()).getOrElseUpdate(v, Path(Nil)) graph = None this } @@ -103,7 +105,7 @@ object FloydWarshallGraph { assert(pathTable.contains(from), s"Vertex $from not exists in graph") assert(pathTable.contains(to), s"Vertex $to not exists in graph") assert(!hasPath(from, to), s"Path from $from to $to already exists in graph") - pathTable(from) += to -> Path(costModel, Seq(edge)) + pathTable(from) += to -> Path(Seq(edge)) graph = None this } @@ -127,6 +129,7 @@ object FloydWarshallGraph { } private def compile(): FloydWarshallGraph[V, E] = { + val costModel = costModelFactory() val vertices = pathTable.keys for (k <- vertices) { for (i <- vertices) { @@ -134,12 +137,16 @@ object FloydWarshallGraph { if (hasPath(i, k) && hasPath(k, j)) { val pathIk = pathTable(i)(k) val pathKj = pathTable(k)(j) - val newPath = Path(costModel, pathIk.edges() ++ pathKj.edges()) + val newPath = Path(pathIk.edges() ++ pathKj.edges()) if (!hasPath(i, j)) { pathTable(i) += j -> newPath } else { val path = pathTable(i)(j) - if (costModel.costComparator().compare(newPath.cost(), path.cost()) < 0) { + if ( + costModel + .costComparator() + .compare(newPath.cost(costModel), path.cost(costModel)) < 0 + ) { pathTable(i) += j -> newPath } } @@ -151,8 +158,8 @@ object FloydWarshallGraph { } } - def create[V <: AnyRef, E <: AnyRef](costModel: CostModel[E]): Builder[V, E] = { - new Impl(costModel) + def create[V <: AnyRef, E <: AnyRef](costModelFactory: () => CostModel[E]): Builder[V, E] = { + new Impl(costModelFactory) } } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala index ef08a34d56154..8e9744383107d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/TransitionGraph.scala @@ -16,18 +16,43 @@ */ package org.apache.gluten.extension.columnar.transition +import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform +import org.apache.gluten.extension.columnar.transition.Convention.BatchType +import org.apache.gluten.ras.Cost + import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.util.SparkReflectionUtil +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.mutable + object TransitionGraph { trait Vertex { + private val initialized: AtomicBoolean = new AtomicBoolean(false) + + final def ensureRegistered(): Unit = { + if (!initialized.compareAndSet(false, true)) { + // Already registered. + return + } + register() + } + + final private def register(): Unit = BatchType.synchronized { + Transition.graph.addVertex(this) + register0() + } + + protected[this] def register0(): Unit + override def toString: String = SparkReflectionUtil.getSimpleClassName(this.getClass) } type Builder = FloydWarshallGraph.Builder[TransitionGraph.Vertex, Transition] def builder(): Builder = { - FloydWarshallGraph.builder(TransitionCostModel) + FloydWarshallGraph.builder(() => new TransitionCostModel()) } implicit class TransitionGraphOps(val graph: TransitionGraph) { @@ -67,54 +92,77 @@ object TransitionGraph { } } - private case class TransitionCost(count: Int, nodeNames: Seq[String]) - extends FloydWarshallGraph.Cost { - override def +(other: FloydWarshallGraph.Cost): TransitionCost = { - other match { - case TransitionCost(otherCount, otherNodeNames) => - TransitionCost(count + otherCount, nodeNames ++ otherNodeNames) - } - } - } + /** Reuse RAS cost to represent transition cost. */ + private case class TransitionCost(value: Cost, nodeNames: Seq[String]) + extends FloydWarshallGraph.Cost - // TODO: Consolidate transition graph's cost model with RAS cost model. - private object TransitionCostModel extends FloydWarshallGraph.CostModel[Transition] { - override def zero(): TransitionCost = TransitionCost(0, Nil) + /** + * The cost model reuses RAS's cost model to evaluate cost of transitions. + * + * Note the transition graph is built once for all subsequent Spark sessions created on the same + * driver, so any access to Spark dynamic SQL config in RAS cost model will not take effect for + * the transition cost evaluation. Hence, it's not recommended to access Spark dynamic + * configurations in RAS cost model as well. + */ + private class TransitionCostModel() extends FloydWarshallGraph.CostModel[Transition] { + private val rasCostModel = EnumeratedTransform.static().costModel + + override def zero(): TransitionCost = TransitionCost(rasCostModel.makeZeroCost(), Nil) override def costOf(transition: Transition): TransitionCost = { costOf0(transition) } + override def sum( + one: FloydWarshallGraph.Cost, + other: FloydWarshallGraph.Cost): FloydWarshallGraph.Cost = (one, other) match { + case (TransitionCost(c1, p1), TransitionCost(c2, p2)) => + TransitionCost(rasCostModel.sum(c1, c2), p1 ++ p2) + } override def costComparator(): Ordering[FloydWarshallGraph.Cost] = { (x: FloydWarshallGraph.Cost, y: FloydWarshallGraph.Cost) => (x, y) match { - case (TransitionCost(count, nodeNames), TransitionCost(otherCount, otherNodeNames)) => - if (count != otherCount) { - count - otherCount + case (TransitionCost(v1, nodeNames1), TransitionCost(v2, nodeNames2)) => + val diff = rasCostModel.costComparator().compare(v1, v2) + if (diff != 0) { + diff } else { // To make the output order stable. - nodeNames.mkString.hashCode - otherNodeNames.mkString.hashCode + nodeNames1.mkString.hashCode - nodeNames2.mkString.hashCode } } } private def costOf0(transition: Transition): TransitionCost = { val leaf = DummySparkPlan() + val transited = transition.apply(leaf) /** * The calculation considers C2C's cost as half of C2R / R2C's cost. So query planner prefers * C2C than C2R / R2C. */ - def costOfPlan(plan: SparkPlan): TransitionCost = plan - .map { - case p if p == leaf => TransitionCost(0, Nil) - case node @ RowToColumnarLike(_) => TransitionCost(2, Seq(node.nodeName)) - case node @ ColumnarToRowLike(_) => TransitionCost(2, Seq(node.nodeName)) - case node @ ColumnarToColumnarLike(_) => TransitionCost(1, Seq(node.nodeName)) - } - .reduce((l, r) => l + r) + def rasCostOfPlan(plan: SparkPlan): Cost = rasCostModel.costOf(plan) + def nodeNamesOfPlan(plan: SparkPlan): Seq[String] = { + plan.map(_.nodeName).reverse + } + + val leafCost = rasCostOfPlan(leaf) + val accumulatedCost = rasCostOfPlan(transited) + val costDiff = rasCostModel.diff(accumulatedCost, leafCost) + + val leafNodeNames = nodeNamesOfPlan(leaf) + val accumulatedNodeNames = nodeNamesOfPlan(transited) + require( + accumulatedNodeNames.startsWith(leafNodeNames), + s"Transition should only add unary nodes on the input plan or leave it unchanged. " + + s"Before: $leaf, after: $transited" + ) + val nodeNamesDiff = mutable.ListBuffer[String]() + nodeNamesDiff ++= accumulatedNodeNames + leafNodeNames.foreach(n => assert(nodeNamesDiff.remove(0) == n)) + assert( + nodeNamesDiff.size == accumulatedNodeNames.size - leafNodeNames.size, + s"Dummy leaf node not found in the transited plan: $transited") - val plan = transition.apply(leaf) - val cost = costOfPlan(plan) - cost + TransitionCost(costDiff, nodeNamesDiff.toSeq) } } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala index 11172a9b36367..23db1c436da88 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala @@ -21,9 +21,8 @@ import org.apache.gluten.extension.GlutenColumnarRule import org.apache.gluten.extension.columnar.ColumnarRuleApplier import org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall import org.apache.gluten.extension.columnar.enumerated.{EnumeratedApplier, EnumeratedTransform} -import org.apache.gluten.extension.columnar.enumerated.planner.cost.{LongCoster, LongCostModel} +import org.apache.gluten.extension.columnar.enumerated.planner.cost.{GlutenCostModel, LongCoster, LongCostModel} import org.apache.gluten.extension.columnar.heuristic.{HeuristicApplier, HeuristicTransform} -import org.apache.gluten.ras.CostModel import org.apache.gluten.ras.rule.RasRule import org.apache.spark.internal.Logging @@ -149,7 +148,7 @@ object GlutenInjector { private def findCostModel( registry: LongCostModel.Registry, - aliasOrClass: String): CostModel[SparkPlan] = { + aliasOrClass: String): GlutenCostModel = { if (LongCostModel.Kind.values().contains(aliasOrClass)) { val kind = LongCostModel.Kind.values()(aliasOrClass) val model = registry.get(kind) @@ -159,7 +158,7 @@ object GlutenInjector { logInfo(s"Using user cost model: $aliasOrClass") val ctor = clazz.getDeclaredConstructor() ctor.setAccessible(true) - val model: CostModel[SparkPlan] = ctor.newInstance() + val model: GlutenCostModel = ctor.newInstance().asInstanceOf[GlutenCostModel] model } } diff --git a/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraphSuite.scala b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraphSuite.scala index 6bc4ab804f1d0..7b60940a1ae2a 100644 --- a/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraphSuite.scala +++ b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/FloydWarshallGraphSuite.scala @@ -36,7 +36,7 @@ class FloydWarshallGraphSuite extends AnyFunSuite { val e42 = Edge(3) val graph = FloydWarshallGraph - .builder(CostModel) + .builder(() => CostModel) .addVertex(v0) .addVertex(v1) .addVertex(v2) @@ -87,14 +87,15 @@ private object FloydWarshallGraphSuite { } } - private case class LongCost(c: Long) extends FloydWarshallGraph.Cost { - override def +(other: FloydWarshallGraph.Cost): FloydWarshallGraph.Cost = other match { - case LongCost(o) => LongCost(c + o) - } - } + private case class LongCost(c: Long) extends FloydWarshallGraph.Cost private object CostModel extends FloydWarshallGraph.CostModel[Edge] { override def zero(): FloydWarshallGraph.Cost = LongCost(0) + override def sum( + one: FloydWarshallGraph.Cost, + other: FloydWarshallGraph.Cost): FloydWarshallGraph.Cost = { + LongCost(one.asInstanceOf[LongCost].c + other.asInstanceOf[LongCost].c) + } override def costOf(edge: Edge): FloydWarshallGraph.Cost = LongCost(edge.distance * 10) override def costComparator(): Ordering[FloydWarshallGraph.Cost] = Ordering.Long.on { case LongCost(c) => c diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LegacyCoster.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LegacyCoster.scala index 5cf9b87f2ac13..bb89d0035bf8c 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LegacyCoster.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LegacyCoster.scala @@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.enumerated.planner.cost import org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike, ColumnarToRowLike, RowToColumnarLike} import org.apache.gluten.utils.PlanUtil -import org.apache.spark.sql.execution.{ColumnarToRowExec, ColumnarWriteFilesExec, ProjectExec, RowToColumnarExec, SparkPlan} +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, ProjectExec, SparkPlan} object LegacyCoster extends LongCoster { override def kind(): LongCostModel.Kind = LongCostModel.Legacy @@ -34,8 +34,6 @@ object LegacyCoster extends LongCoster { private def selfCostOf0(node: SparkPlan): Long = { node match { case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0 - case ColumnarToRowExec(_) => 10L - case RowToColumnarExec(_) => 10L case ColumnarToRowLike(_) => 10L case RowToColumnarLike(_) => 10L case ColumnarToColumnarLike(_) => 5L diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster.scala index d2959d46a13cf..ab893265ec42a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster.scala @@ -42,8 +42,6 @@ object RoughCoster extends LongCoster { // Avoid moving computation back to native when transition has complex types in schema. // Such transitions are observed to be extremely expensive as of now. Long.MaxValue - case ColumnarToRowExec(_) => 10L - case RowToColumnarExec(_) => 10L case ColumnarToRowLike(_) => 10L case RowToColumnarLike(_) => 10L case ColumnarToColumnarLike(_) => 5L diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala deleted file mode 100644 index e46274a79f69e..0000000000000 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/RoughCoster2.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension.columnar.enumerated.planner.cost - -import org.apache.gluten.config.GlutenConfig -import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, RowToColumnarLike} -import org.apache.gluten.utils.PlanUtil - -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, NamedExpression} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase - -// Since https://github.com/apache/incubator-gluten/pull/7686. -object RoughCoster2 extends LongCoster { - override def kind(): LongCostModel.Kind = LongCostModel.Rough2 - - override def selfCostOf(node: SparkPlan): Option[Long] = { - Some(selfCostOf0(node)) - } - - private def selfCostOf0(node: SparkPlan): Long = { - val sizeFactor = getSizeFactor(node) - val opCost = node match { - case ProjectExec(projectList, _) if projectList.forall(isCheapExpression) => - // Make trivial ProjectExec has the same cost as ProjectExecTransform to reduce unnecessary - // c2r and r2c. - 1L - case ColumnarToRowExec(_) => 1L - case RowToColumnarExec(_) => 1L - case ColumnarToRowLike(_) => 1L - case RowToColumnarLike(_) => - // If sizeBytes is less than the threshold, the cost of RowToColumnarLike is ignored. - if (sizeFactor == 0) 1L else GlutenConfig.get.rasRough2R2cCost - case p if PlanUtil.isGlutenColumnarOp(p) => 1L - case p if PlanUtil.isVanillaColumnarOp(p) => GlutenConfig.get.rasRough2VanillaCost - // Other row ops. Usually a vanilla row op. - case _ => GlutenConfig.get.rasRough2VanillaCost - } - opCost * Math.max(1, sizeFactor) - } - - private def getSizeFactor(plan: SparkPlan): Long = { - // Get the bytes size that the plan needs to consume. - val sizeBytes = plan match { - case _: DataSourceScanExec | _: DataSourceV2ScanExecBase => getStatSizeBytes(plan) - case _: LeafExecNode => 0L - case p => p.children.map(getStatSizeBytes).sum - } - sizeBytes / GlutenConfig.get.rasRough2SizeBytesThreshold - } - - private def getStatSizeBytes(plan: SparkPlan): Long = { - plan match { - case a: AdaptiveSparkPlanExec => getStatSizeBytes(a.inputPlan) - case _ => - plan.logicalLink match { - case Some(logicalPlan) => logicalPlan.stats.sizeInBytes.toLong - case _ => plan.children.map(getStatSizeBytes).sum - } - } - } - - private def isCheapExpression(ne: NamedExpression): Boolean = ne match { - case Alias(_: Attribute, _) => true - case _: Attribute => true - case _ => false - } -} diff --git a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala index fec36ac1acfa1..2c423783fdcc7 100644 --- a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala +++ b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala @@ -16,8 +16,12 @@ */ package org.apache.gluten.extension.columnar.transition +import org.apache.gluten.backend.Backend +import org.apache.gluten.component.Component import org.apache.gluten.exception.GlutenException import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan} +import org.apache.gluten.extension.columnar.enumerated.planner.cost.LegacyCoster +import org.apache.gluten.extension.injector.Injector import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -28,35 +32,38 @@ import org.apache.spark.sql.vectorized.ColumnarBatch class TransitionSuite extends SharedSparkSession { import TransitionSuite._ + + override protected def beforeAll(): Unit = { + super.beforeAll() + new DummyBackend().ensureRegistered() + Convention.ensureSparkRowAndBatchTypesRegistered() + TypeA.ensureRegistered() + TypeB.ensureRegistered() + TypeC.ensureRegistered() + TypeD.ensureRegistered() + } + test("Trivial C2R") { val in = BatchLeaf(TypeA) - val out = ConventionFunc.ignoreBackend { - Transitions.insert(in, outputsColumnar = false) - } + val out = Transitions.insert(in, outputsColumnar = false) assert(out == BatchToRow(TypeA, BatchLeaf(TypeA))) } test("Insert C2R") { val in = RowUnary(BatchLeaf(TypeA)) - val out = ConventionFunc.ignoreBackend { - Transitions.insert(in, outputsColumnar = false) - } + val out = Transitions.insert(in, outputsColumnar = false) assert(out == RowUnary(BatchToRow(TypeA, BatchLeaf(TypeA)))) } test("Insert R2C") { val in = BatchUnary(TypeA, RowLeaf()) - val out = ConventionFunc.ignoreBackend { - Transitions.insert(in, outputsColumnar = false) - } + val out = Transitions.insert(in, outputsColumnar = false) assert(out == BatchToRow(TypeA, BatchUnary(TypeA, RowToBatch(TypeA, RowLeaf())))) } test("Insert C2R2C") { val in = BatchUnary(TypeA, BatchLeaf(TypeB)) - val out = ConventionFunc.ignoreBackend { - Transitions.insert(in, outputsColumnar = false) - } + val out = Transitions.insert(in, outputsColumnar = false) assert( out == BatchToRow( TypeA, @@ -65,9 +72,7 @@ class TransitionSuite extends SharedSparkSession { test("Insert C2C") { val in = BatchUnary(TypeA, BatchLeaf(TypeC)) - val out = ConventionFunc.ignoreBackend { - Transitions.insert(in, outputsColumnar = false) - } + val out = Transitions.insert(in, outputsColumnar = false) assert( out == BatchToRow( TypeA, @@ -77,9 +82,7 @@ class TransitionSuite extends SharedSparkSession { test("No transitions found") { val in = BatchUnary(TypeA, BatchLeaf(TypeD)) assertThrows[GlutenException] { - ConventionFunc.ignoreBackend { - Transitions.insert(in, outputsColumnar = false) - } + Transitions.insert(in, outputsColumnar = false) } } } @@ -145,4 +148,12 @@ object TransitionSuite extends TransitionSuiteBase { throw new UnsupportedOperationException() } + class DummyBackend extends Backend { + override def name(): String = "dummy-backend" + override def buildInfo(): Component.BuildInfo = + Component.BuildInfo("DUMMY_BACKEND", "N/A", "N/A", "N/A") + override def injectRules(injector: Injector): Unit = { + injector.gluten.ras.injectCoster(_ => LegacyCoster) + } + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index d4083d5896eb2..eb9071badb145 100644 --- a/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -295,12 +295,6 @@ class GlutenConfig(conf: SQLConf) extends Logging { def rasCostModel: String = getConf(RAS_COST_MODEL) - def rasRough2SizeBytesThreshold: Long = getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD) - - def rasRough2R2cCost: Long = getConf(RAS_ROUGH2_R2C_COST) - - def rasRough2VanillaCost: Long = getConf(RAS_ROUGH2_VANILLA_COST) - def enableVeloxCache: Boolean = getConf(COLUMNAR_VELOX_CACHE_ENABLED) def veloxMemCacheSize: Long = getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE) @@ -1459,32 +1453,12 @@ object GlutenConfig { val RAS_COST_MODEL = buildConf("spark.gluten.ras.costModel") .doc( - "Experimental: The class name of user-defined cost model that will be used by RAS. If " + - "not specified, a legacy built-in cost model that exhaustively offloads computations " + - "will be used.") + "The class name of user-defined cost model that will be used by Gluten's transition " + + "planner as well as by RAS. If not specified, a legacy built-in cost model that " + + "exhaustively offloads computations will be used.") .stringConf .createWithDefaultString("legacy") - val RAS_ROUGH2_SIZEBYTES_THRESHOLD = - buildConf("spark.gluten.ras.rough2.sizeBytesThreshold") - .doc( - "Experimental: Threshold of the byte size consumed by sparkPlan, coefficient used " + - "to calculate cost in RAS rough2 model") - .longConf - .createWithDefault(1073741824L) - - val RAS_ROUGH2_R2C_COST = - buildConf("spark.gluten.ras.rough2.r2c.cost") - .doc("Experimental: Cost of RowToVeloxColumnarExec in RAS rough2 model") - .longConf - .createWithDefault(100L) - - val RAS_ROUGH2_VANILLA_COST = - buildConf("spark.gluten.ras.rough2.vanilla.cost") - .doc("Experimental: Cost of vanilla spark operater in RAS rough model") - .longConf - .createWithDefault(20L) - // velox caching options. val COLUMNAR_VELOX_CACHE_ENABLED = buildStaticConf("spark.gluten.sql.columnar.backend.velox.cacheEnabled") From a96e0d2c644be8b211f3ef09b2c9610724a6daa4 Mon Sep 17 00:00:00 2001 From: Yuan Date: Wed, 15 Jan 2025 11:18:00 +0800 Subject: [PATCH 7/7] [GLUTEN-8487][VL] adding JDK17 based Centos8 image (#8513) (#8539) This patch added JDK17 based Centos 8 image --- .github/workflows/docker_image.yml | 11 +++++++++++ dev/docker/Dockerfile.centos8-dynamic-build-jdk17 | 13 +++++++++++++ 2 files changed, 24 insertions(+) create mode 100644 dev/docker/Dockerfile.centos8-dynamic-build-jdk17 diff --git a/.github/workflows/docker_image.yml b/.github/workflows/docker_image.yml index 58b7549adb70b..727dc3cd057b9 100644 --- a/.github/workflows/docker_image.yml +++ b/.github/workflows/docker_image.yml @@ -23,6 +23,8 @@ on: - '.github/workflows/docker_image.yml' - 'dev/docker/Dockerfile.centos7-static-build' - 'dev/docker/Dockerfile.centos8-dynamic-build' + - 'dev/docker/Dockerfile.centos8-dynamic-build-jdk11' + - 'dev/docker/Dockerfile.centos8-dynamic-build-jdk17' schedule: - cron: '0 20 * * 0' @@ -84,3 +86,12 @@ jobs: file: dev/docker/Dockerfile.centos8-dynamic-build-jdk11 push: true tags: apache/gluten:centos-8-jdk11 + + - name: Build and push Docker image Centos8 + JDK17 + uses: docker/build-push-action@v2 + with: + context: . + file: dev/docker/Dockerfile.centos8-dynamic-build-jdk17 + push: true + tags: apache/gluten:centos-8-jdk17 + diff --git a/dev/docker/Dockerfile.centos8-dynamic-build-jdk17 b/dev/docker/Dockerfile.centos8-dynamic-build-jdk17 new file mode 100644 index 0000000000000..e6817a8d605c0 --- /dev/null +++ b/dev/docker/Dockerfile.centos8-dynamic-build-jdk17 @@ -0,0 +1,13 @@ +FROM apache/gluten:centos-8 + + +RUN yum install -y java-17-openjdk-devel patch wget git perl +ENV JAVA_HOME=/usr/lib/jvm/java-17-openjdk +ENV PATH=$JAVA_HOME/bin:$PATH + +ENV PATH=${PATH}:/usr/lib/maven/bin + +RUN git clone --depth=1 https://github.com/apache/incubator-gluten /opt/gluten + +RUN cd /opt/gluten && source /opt/rh/gcc-toolset-11/enable && ./dev/builddeps-veloxbe.sh --run_setup_script=ON build_arrow && rm -rf /opt/gluten +