Skip to content

Commit

Permalink
Change FileSink.object_store_url to Url (#248)
Browse files Browse the repository at this point in the history
Co-authored-by: svranesevic <[email protected]>
  • Loading branch information
svranesevic and svranesevic authored Jul 1, 2024
1 parent 88293ad commit 99915ac
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 17 deletions.
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ impl DataSink for ArrowFileSink {
) -> Result<u64> {
let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
.object_store_registry
.get_store(&self.config.object_store_url)?;

let part_col = if !self.config.table_partition_cols.is_empty() {
Some(self.config.table_partition_cols.clone())
Expand Down
8 changes: 5 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ impl DataSink for ParquetSink {

let object_store = context
.runtime_env()
.object_store(&self.config.object_store_url)?;
.object_store_registry
.get_store(&self.config.object_store_url)?;

let parquet_opts = &self.parquet_options;
let allow_single_file_parallelism =
Expand Down Expand Up @@ -1136,6 +1137,7 @@ mod tests {
use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex};
use parquet::file::page_index::index::Index;
use tokio::fs::File;
use url::Url;

#[tokio::test]
async fn read_merged_batches() -> Result<()> {
Expand Down Expand Up @@ -1844,7 +1846,7 @@ mod tests {
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
object_store_url: AsRef::<Url>::as_ref(&object_store_url).clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
Expand Down Expand Up @@ -1938,7 +1940,7 @@ mod tests {

// set file config to include partitioning on field_a
let file_sink_config = FileSinkConfig {
object_store_url: object_store_url.clone(),
object_store_url: AsRef::<Url>::as_ref(&object_store_url).clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ pub(crate) async fn stateless_multipart_put(
) -> Result<u64> {
let object_store = context
.runtime_env()
.object_store(&config.object_store_url)?;
.object_store_registry
.get_store(&config.object_store_url)?;

let base_output_path = &config.table_paths[0];
let part_cols = if !config.table_partition_cols.is_empty() {
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
use url::Url;

/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -890,8 +891,9 @@ impl TableProvider for ListingTable {
let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;

// Sink related option, apart from format
let object_store_url: &Url = self.table_paths()[0].as_ref();
let config = FileSinkConfig {
object_store_url: self.table_paths()[0].object_store(),
object_store_url: object_store_url.clone(),
table_paths: self.table_paths().clone(),
file_groups,
output_schema: self.schema(),
Expand Down
8 changes: 3 additions & 5 deletions datafusion/core/src/datasource/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ use super::listing::ListingTableUrl;
use crate::error::Result;
use crate::physical_plan::{DisplayAs, DisplayFormatType};
use crate::{
datasource::{
listing::{FileRange, PartitionedFile},
object_store::ObjectStoreUrl,
},
datasource::listing::{FileRange, PartitionedFile},
physical_plan::display::{display_orderings, ProjectSchemaDisplay},
};

Expand All @@ -68,12 +65,13 @@ use datafusion_physical_expr::PhysicalSortExpr;
use futures::StreamExt;
use log::debug;
use object_store::{path::Path, GetOptions, GetRange, ObjectMeta, ObjectStore};
use url::Url;

/// The base configurations to provide when creating a physical plan for
/// writing to any given file format.
pub struct FileSinkConfig {
/// Object store URL, used to get an ObjectStore instance
pub object_store_url: ObjectStoreUrl,
pub object_store_url: Url,
/// A vector of [`PartitionedFile`] structs, each representing a file partition
pub file_groups: Vec<PartitionedFile>,
/// Vector of partition paths
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;
use url::Url;

fn create_function_physical_name(
fun: &str,
Expand Down Expand Up @@ -765,7 +766,7 @@ impl DefaultPhysicalPlanner {
}) => {
let input_exec = children.one()?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();
let object_store_url: &Url = parsed_url.as_ref();

let schema: Schema = (**input.schema()).clone().into();

Expand All @@ -779,7 +780,7 @@ impl DefaultPhysicalPlanner {

// Set file sink related options
let config = FileSinkConfig {
object_store_url,
object_store_url: object_store_url.clone(),
table_paths: vec![parsed_url],
file_groups: vec![],
output_schema: Arc::new(schema),
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pbjson = { version = "0.6.0", optional = true }
prost = "0.12.0"
serde = { version = "1.0", optional = true }
serde_json = { workspace = true, optional = true }
url = { workspace = true }

[dev-dependencies]
datafusion-functions = { workspace = true, default-features = true }
Expand Down
5 changes: 4 additions & 1 deletion datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,11 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
Ok((name.clone(), data_type))
})
.collect::<Result<Vec<_>>>()?;

let object_store_url = url::Url::parse(&conf.object_store_url)
.map_err(|e| DataFusionError::Internal(format!("{e}")))?;
Ok(Self {
object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
object_store_url,
file_groups,
table_paths,
output_schema: Arc::new(convert_required!(conf.output_schema)?),
Expand Down
10 changes: 7 additions & 3 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use datafusion_proto::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use datafusion_proto::protobuf;
use url::Url;

/// Perform a serde roundtrip and assert that the string representation of the before and after plans
/// are identical. Note that this often isn't sufficient to guarantee that no information is
Expand Down Expand Up @@ -885,9 +886,10 @@ fn roundtrip_json_sink() -> Result<()> {
let field_b = Field::new("plan", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
object_store_url: AsRef::<Url>::as_ref(&object_store_url).clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
Expand Down Expand Up @@ -920,9 +922,10 @@ fn roundtrip_csv_sink() -> Result<()> {
let field_b = Field::new("plan", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
object_store_url: AsRef::<Url>::as_ref(&object_store_url).clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
Expand Down Expand Up @@ -978,9 +981,10 @@ fn roundtrip_parquet_sink() -> Result<()> {
let field_b = Field::new("plan", DataType::Utf8, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
object_store_url: AsRef::<Url>::as_ref(&object_store_url).clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
Expand Down

0 comments on commit 99915ac

Please sign in to comment.