Skip to content

Commit 7620ebc

Browse files
authored
feat: Make supported hadoop filesystem schemes configurable (apache#2272)
* feat: Make supported hadoop filesystem schemes configurable
1 parent e17bce3 commit 7620ebc

File tree

6 files changed

+170
-6
lines changed

6 files changed

+170
-6
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,16 @@ object CometConf extends ShimCometConf {
641641
.longConf
642642
.createWithDefault(3000L)
643643

644+
val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes"
645+
646+
val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =
647+
conf(s"spark.hadoop.$COMET_LIBHDFS_SCHEMES_KEY")
648+
.doc(
649+
"Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses " +
650+
"via libhdfs, separated by commas. Valid only when built with hdfs feature enabled.")
651+
.stringConf
652+
.createOptional
653+
644654
/** Create a config to enable a specific operator */
645655
private def createExecEnabledConfig(
646656
exec: String,

common/src/main/scala/org/apache/comet/objectstore/NativeConfig.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@ package org.apache.comet.objectstore
2222
import java.net.URI
2323
import java.util.Locale
2424

25+
import org.apache.commons.lang3.StringUtils
2526
import org.apache.hadoop.conf.Configuration
2627

28+
import org.apache.comet.CometConf.COMET_LIBHDFS_SCHEMES_KEY
29+
2730
object NativeConfig {
2831

2932
private val objectStoreConfigPrefixes = Map(
@@ -55,16 +58,22 @@ object NativeConfig {
5558
def extractObjectStoreOptions(hadoopConf: Configuration, uri: URI): Map[String, String] = {
5659
val scheme = uri.getScheme.toLowerCase(Locale.ROOT)
5760

61+
import scala.collection.JavaConverters._
62+
val options = scala.collection.mutable.Map[String, String]()
63+
64+
// The schemes will use libhdfs
65+
val libhdfsSchemes = hadoopConf.get(COMET_LIBHDFS_SCHEMES_KEY)
66+
if (StringUtils.isNotBlank(libhdfsSchemes)) {
67+
options(COMET_LIBHDFS_SCHEMES_KEY) = libhdfsSchemes
68+
}
69+
5870
// Get prefixes for this scheme, return early if none found
5971
val prefixes = objectStoreConfigPrefixes.get(scheme)
6072
if (prefixes.isEmpty) {
61-
return Map.empty[String, String]
73+
return options.toMap
6274
}
6375

64-
import scala.collection.JavaConverters._
65-
6676
// Extract all configurations that match the object store prefixes
67-
val options = scala.collection.mutable.Map[String, String]()
6877
hadoopConf.iterator().asScala.foreach { entry =>
6978
val key = entry.getKey
7079
val value = entry.getValue

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,5 @@ Comet provides the following configuration settings.
9090
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
9191
| spark.comet.shuffle.sizeInBytesMultiplier | Comet reports smaller sizes for shuffle due to using Arrow's columnar memory format and this can result in Spark choosing a different join strategy due to the estimated size of the exchange being smaller. Comet will multiple sizeInBytes by this amount to avoid regressions in join strategy. | 1.0 |
9292
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Arrow columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
93+
| spark.hadoop.fs.comet.libhdfs.schemes | Defines filesystem schemes (e.g., hdfs, webhdfs) that the native side accesses via libhdfs, separated by commas. Valid only when built with hdfs feature enabled. | |
9394
<!--END:CONFIG_TABLE-->

native/core/src/parquet/parquet_support.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,17 @@ fn value_field(entries_field: &FieldRef) -> Option<FieldRef> {
336336
}
337337
}
338338

339+
fn is_hdfs_scheme(url: &Url, object_store_configs: &HashMap<String, String>) -> bool {
340+
const COMET_LIBHDFS_SCHEMES_KEY: &str = "fs.comet.libhdfs.schemes";
341+
let scheme = url.scheme();
342+
if let Some(libhdfs_schemes) = object_store_configs.get(COMET_LIBHDFS_SCHEMES_KEY) {
343+
use itertools::Itertools;
344+
libhdfs_schemes.split(",").contains(scheme)
345+
} else {
346+
scheme == "hdfs"
347+
}
348+
}
349+
339350
// Mirrors object_store::parse::parse_url for the hdfs object store
340351
#[cfg(feature = "hdfs")]
341352
fn parse_hdfs_url(url: &Url) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
@@ -406,8 +417,9 @@ pub(crate) fn prepare_object_store_with_configs(
406417
) -> Result<(ObjectStoreUrl, Path), ExecutionError> {
407418
let mut url = Url::parse(url.as_str())
408419
.map_err(|e| ExecutionError::GeneralError(format!("Error parsing URL {url}: {e}")))?;
420+
let is_hdfs_scheme = is_hdfs_scheme(&url, object_store_configs);
409421
let mut scheme = url.scheme();
410-
if scheme == "s3a" {
422+
if !is_hdfs_scheme && scheme == "s3a" {
411423
scheme = "s3";
412424
url.set_scheme("s3").map_err(|_| {
413425
ExecutionError::GeneralError("Could not convert scheme from s3a to s3".to_string())
@@ -419,7 +431,7 @@ pub(crate) fn prepare_object_store_with_configs(
419431
&url[url::Position::BeforeHost..url::Position::AfterPort],
420432
);
421433

422-
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if scheme == "hdfs" {
434+
let (object_store, object_store_path): (Box<dyn ObjectStore>, Path) = if is_hdfs_scheme {
423435
parse_hdfs_url(&url)
424436
} else if scheme == "s3" {
425437
objectstore::s3::create_store(&url, object_store_configs, Duration::from_secs(300))
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.hadoop.fs;
21+
22+
import java.net.URI;
23+
24+
import org.apache.hadoop.fs.RawLocalFileSystem;
25+
26+
public class FakeHDFSFileSystem extends RawLocalFileSystem {
27+
28+
public static final String PREFIX = "fake://fake-bucket";
29+
30+
public FakeHDFSFileSystem() {
31+
// Avoid `URI scheme is not "file"` error on
32+
// RawLocalFileSystem$DeprecatedRawLocalFileStatus.getOwner
33+
RawLocalFileSystem.useStatIfAvailable();
34+
}
35+
36+
@Override
37+
public String getScheme() {
38+
return "fake";
39+
}
40+
41+
@Override
42+
public URI getUri() {
43+
return URI.create(PREFIX);
44+
}
45+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.parquet
21+
22+
import java.io.File
23+
import java.nio.file.Files
24+
import java.util.UUID
25+
26+
import org.apache.commons.io.FileUtils
27+
import org.apache.spark.SparkConf
28+
import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode}
29+
import org.apache.spark.sql.comet.CometNativeScanExec
30+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
31+
import org.apache.spark.sql.functions.{col, sum}
32+
33+
import org.apache.comet.CometConf
34+
import org.apache.comet.hadoop.fs.FakeHDFSFileSystem
35+
36+
class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkPlanHelper {
37+
38+
private var fake_root_dir: File = _
39+
40+
override protected def sparkConf: SparkConf = {
41+
val conf = super.sparkConf
42+
conf.set("spark.hadoop.fs.fake.impl", "org.apache.comet.hadoop.fs.FakeHDFSFileSystem")
43+
conf.set("spark.hadoop.fs.defaultFS", FakeHDFSFileSystem.PREFIX)
44+
conf.set(CometConf.COMET_LIBHDFS_SCHEMES.key, "fake,hdfs")
45+
}
46+
47+
override def beforeAll(): Unit = {
48+
// Initialize fake root dir
49+
fake_root_dir = Files.createTempDirectory(s"comet_fake_${UUID.randomUUID().toString}").toFile
50+
// Initialize Spark session
51+
super.beforeAll()
52+
}
53+
54+
protected override def afterAll(): Unit = {
55+
if (fake_root_dir != null) FileUtils.deleteDirectory(fake_root_dir)
56+
super.afterAll()
57+
}
58+
59+
private def writeTestParquetFile(filePath: String): Unit = {
60+
val df = spark.range(0, 1000)
61+
df.write.format("parquet").mode(SaveMode.Overwrite).save(filePath)
62+
}
63+
64+
private def assertCometNativeScanOnFakeFs(df: DataFrame): Unit = {
65+
val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec =>
66+
p
67+
}
68+
assert(scans.size == 1)
69+
assert(
70+
scans.head.nativeOp.getNativeScan
71+
.getFilePartitions(0)
72+
.getPartitionedFile(0)
73+
.getFilePath
74+
.startsWith(FakeHDFSFileSystem.PREFIX))
75+
}
76+
77+
test("test native_datafusion scan on fake fs") {
78+
val testFilePath =
79+
s"${FakeHDFSFileSystem.PREFIX}${fake_root_dir.getAbsolutePath}/data/test-file.parquet"
80+
writeTestParquetFile(testFilePath)
81+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
82+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
83+
assertCometNativeScanOnFakeFs(df)
84+
assert(df.first().getLong(0) == 499500)
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)