Skip to content

Commit

Permalink
[GLUTEN-8330][VL] Improve convert the viewfs path to hdfs path (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyum authored Dec 26, 2024
1 parent 077ba52 commit 5d1d0ba
Showing 3 changed files with 93 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -49,8 +49,10 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils

import scala.collection.mutable
import scala.util.control.Breaks.breakable

class VeloxBackend extends SubstraitBackend {
@@ -107,17 +109,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (filteredRootPaths.nonEmpty) {
val resolvedPaths =
if (
GlutenConfig.getConf.enableHdfsViewfs && filteredRootPaths.head.startsWith("viewfs")
) {
// Convert the viewfs path to hdfs path.
filteredRootPaths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.get.value)
viewFileSystem.resolvePath(viewPath).toString
}
if (GlutenConfig.getConf.enableHdfsViewfs) {
ViewFileSystemUtils.convertViewfsToHdfs(
filteredRootPaths,
mutable.Map.empty[String, String],
serializableHadoopConf.get.value)
} else {
filteredRootPaths
}
Original file line number Diff line number Diff line change
@@ -46,7 +46,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.viewfs.ViewFileSystemUtils

import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -377,26 +377,16 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
if (GlutenConfig.getConf.enableHdfsViewfs) {
val viewfsToHdfsCache: mutable.Map[String, String] = mutable.Map.empty
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.exists(_.startsWith("viewfs"))) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
var finalPath = viewfsPath
while (finalPath.startsWith("viewfs")) {
val viewPath = new Path(finalPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
finalPath = viewFileSystem.resolvePath(viewPath).toString
}
finalPath
}
splitInfo.setPaths(newPaths.asJava)
}
val newPaths = ViewFileSystemUtils.convertViewfsToHdfs(
splitInfo.getPaths.asScala.toSeq,
viewfsToHdfsCache,
serializableHadoopConf.value)
splitInfo.setPaths(newPaths.asJava)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import scala.collection.mutable

object ViewFileSystemUtils {

/**
* Convert the viewfs path to hdfs path. Similar to ViewFileSystem.resolvePath, but does not make
* RPC calls.
*/
def convertViewfsToHdfs(f: String, hadoopConfig: Configuration): String = {
val path = new Path(f)
FileSystem.get(path.toUri, hadoopConfig) match {
case vfs: ViewFileSystem =>
val fsStateField = vfs.getClass.getDeclaredField("fsState")
fsStateField.setAccessible(true)
val fsState = fsStateField.get(vfs).asInstanceOf[InodeTree[FileSystem]]
val res = fsState.resolve(f, true)
if (res.isInternalDir) {
f
} else {
Path.mergePaths(new Path(res.targetFileSystem.getUri), res.remainingPath).toString
}
case otherFileSystem =>
otherFileSystem.resolvePath(path).toString
}
}

/**
* Convert a sequence of viewfs path to a sequence of hdfs path.
* @param paths
* sequence of viewfs path
* @param viewfsToHdfsCache
* A map use to cache converted paths
* @param hadoopConfig
* Hadoop configuration
* @return
* sequence of hdfs path
*/
def convertViewfsToHdfs(
paths: Seq[String],
viewfsToHdfsCache: mutable.Map[String, String],
hadoopConfig: Configuration): Seq[String] = {
paths.map {
path =>
if (path.startsWith("viewfs")) {
val pathSplit = path.split(Path.SEPARATOR)
val prefixIndex = pathSplit.size - 1
val pathPrefix = pathSplit.take(prefixIndex).mkString(Path.SEPARATOR)
val hdfsPath = viewfsToHdfsCache.getOrElseUpdate(
pathPrefix,
convertViewfsToHdfs(pathPrefix, hadoopConfig))
hdfsPath + Path.SEPARATOR + pathSplit.drop(prefixIndex).mkString(Path.SEPARATOR)
} else {
path
}
}
}
}

0 comments on commit 5d1d0ba

Please sign in to comment.