From 1496ab1e047461b0c34605553a91fcf91c406cdc Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Fri, 23 May 2025 13:55:49 +0000 Subject: [PATCH] Respect user-provided basePath in streaming file source reads --- .../streaming/runtime/FileStreamSource.scala | 7 ++++--- .../sql/streaming/FileStreamSourceSuite.scala | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala index d5503f1c247da..75bec35476a41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/FileStreamSource.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.classic.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.connector.read.streaming import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{DataSource, FileIndexOptions, InMemoryFileIndex, LogicalRelation} import org.apache.spark.sql.execution.streaming.{Offset, Source} import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink import org.apache.spark.sql.internal.SQLConf @@ -75,8 +75,9 @@ class FileStreamSource( private val optionsForInnerDataSource = sourceOptions.optionMapWithoutPath ++ { val pathOption = - if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) { - Map("basePath" -> path) + if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path") && + !CaseInsensitiveMap(options).contains(FileIndexOptions.BASE_PATH_PARAM)) { + Map(FileIndexOptions.BASE_PATH_PARAM -> path) } else { Map() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 15a0f048dd8a5..fb76043096cfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -2621,6 +2621,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("SPARK-50603: respect user-provided basePath without globbing") { + withTempDirs { case (dir, tmp) => + val partitionFooSubDir = new File(dir, "partition=foo") + partitionFooSubDir.mkdir() + + val schema = new StructType().add("value", StringType).add("partition", StringType) + val fileStream = createFileStream("json", s"${dir.getCanonicalPath}/partition=foo", + Some(schema), Map("basePath" -> dir.getCanonicalPath())) + testStream(fileStream)( + // Add data to partition dir + AddTextFileData("{'value': 'abc'}", partitionFooSubDir, tmp), + CheckAnswer(("abc", "foo")), + + // Add more data to same partition=foo sub dir + AddTextFileData("{'value': 'def'}", partitionFooSubDir, tmp), + CheckAnswer(("abc", "foo"), ("def", "foo")) + ) + } + } } @SlowSQLTest