Skip to content

Commit e0df256

Browse files
committed
support connection string
1 parent 6102ba9 commit e0df256

File tree

10 files changed

+481
-251
lines changed

10 files changed

+481
-251
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ Alternatively, you can use the following environment variables when starting pos
191191

192192
Supported S3 uri formats are shown below:
193193
- s3:// \<bucket\> / \<path\>
194-
- s3a:// \<bucket\> / \<path\>
195194
- https:// \<bucket\>.s3.amazonaws.com / \<path\>
196195
- https:// s3.amazonaws.com / \<bucket\> / \<path\>
197196

@@ -209,6 +208,7 @@ key = Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/
209208
Alternatively, you can use the following environment variables when starting postgres to configure the Azure Blob Storage client:
210209
- `AZURE_STORAGE_ACCOUNT`: the storage account name of the Azure Blob
211210
- `AZURE_STORAGE_KEY`: the storage key of the Azure Blob
211+
- `AZURE_STORAGE_CONNECTION_STRING`: the connection string for the Azure Blob (this can be set instead of specifying account name and key)
212212
- `AZURE_STORAGE_SAS_TOKEN`: the storage SAS token for the Azure Blob
213213
- `AZURE_STORAGE_ENDPOINT`: the endpoint **(only via environment variables)**
214214
- `AZURE_CONFIG_FILE`: an alternative location for the config file **(only via environment variables)**

src/arrow_parquet/parquet_reader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::{
2525
},
2626
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2727
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
28+
PG_BACKEND_TOKIO_RUNTIME,
2829
};
2930

3031
use super::{
@@ -33,7 +34,7 @@ use super::{
3334
schema_parser::{
3435
ensure_file_schema_match_tupledesc_schema, parse_arrow_schema_from_attributes,
3536
},
36-
uri_utils::{parquet_reader_from_uri, PG_BACKEND_TOKIO_RUNTIME},
37+
uri_utils::parquet_reader_from_uri,
3738
};
3839

3940
pub(crate) struct ParquetReaderContext {

src/arrow_parquet/parquet_writer.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ use crate::{
1515
schema_parser::{
1616
parquet_schema_string_from_attributes, parse_arrow_schema_from_attributes,
1717
},
18-
uri_utils::{parquet_writer_from_uri, PG_BACKEND_TOKIO_RUNTIME},
18+
uri_utils::parquet_writer_from_uri,
1919
},
2020
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
2121
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
22+
PG_BACKEND_TOKIO_RUNTIME,
2223
};
2324

2425
use super::pg_to_arrow::{

src/arrow_parquet/uri_utils.rs

Lines changed: 8 additions & 235 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,6 @@
1-
use std::{
2-
panic,
3-
sync::{Arc, LazyLock},
4-
};
1+
use std::{panic, sync::Arc};
52

63
use arrow::datatypes::SchemaRef;
7-
use aws_config::BehaviorVersion;
8-
use aws_credential_types::provider::ProvideCredentials;
9-
use home::home_dir;
10-
use ini::Ini;
11-
use object_store::{
12-
aws::{AmazonS3, AmazonS3Builder},
13-
azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder},
14-
local::LocalFileSystem,
15-
path::Path,
16-
ObjectStore, ObjectStoreScheme,
17-
};
184
use parquet::{
195
arrow::{
206
arrow_to_parquet_schema,
@@ -29,229 +15,16 @@ use pgrx::{
2915
ereport,
3016
pg_sys::{get_role_oid, has_privs_of_role, superuser, AsPgCStr, GetUserId},
3117
};
32-
use tokio::runtime::Runtime;
3318
use url::Url;
3419

35-
use crate::arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE;
20+
use crate::{
21+
arrow_parquet::parquet_writer::DEFAULT_ROW_GROUP_SIZE, object_store::create_object_store,
22+
PG_BACKEND_TOKIO_RUNTIME,
23+
};
3624

3725
const PARQUET_OBJECT_STORE_READ_ROLE: &str = "parquet_object_store_read";
3826
const PARQUET_OBJECT_STORE_WRITE_ROLE: &str = "parquet_object_store_write";
3927

40-
// PG_BACKEND_TOKIO_RUNTIME creates a tokio runtime that uses the current thread
41-
// to run the tokio reactor. This uses the same thread that is running the Postgres backend.
42-
pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
43-
tokio::runtime::Builder::new_current_thread()
44-
.enable_all()
45-
.build()
46-
.unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e))
47-
});
48-
49-
fn parse_azure_blob_container(uri: &Url) -> Option<String> {
50-
let host = uri.host_str()?;
51-
52-
// az(ure)://{container}/key
53-
if uri.scheme() == "az" || uri.scheme() == "azure" {
54-
return Some(host.to_string());
55-
}
56-
// https://{account}.blob.core.windows.net/{container}
57-
else if host.ends_with(".blob.core.windows.net") {
58-
let path_segments: Vec<&str> = uri.path_segments()?.collect();
59-
60-
// Container name is the first part of the path
61-
return Some(
62-
path_segments
63-
.first()
64-
.expect("unexpected error during parsing azure blob uri")
65-
.to_string(),
66-
);
67-
}
68-
69-
None
70-
}
71-
72-
fn parse_s3_bucket(uri: &Url) -> Option<String> {
73-
let host = uri.host_str()?;
74-
75-
// s3(a)://{bucket}/key
76-
if uri.scheme() == "s3" || uri.scheme() == "s3a" {
77-
return Some(host.to_string());
78-
}
79-
// https://s3.amazonaws.com/{bucket}/key
80-
else if host == "s3.amazonaws.com" {
81-
let path_segments: Vec<&str> = uri.path_segments()?.collect();
82-
83-
// Bucket name is the first part of the path
84-
return Some(
85-
path_segments
86-
.first()
87-
.expect("unexpected error during parsing s3 uri")
88-
.to_string(),
89-
);
90-
}
91-
// https://{bucket}.s3.amazonaws.com/key
92-
else if host.ends_with(".s3.amazonaws.com") {
93-
let bucket_name = host.split('.').next()?;
94-
return Some(bucket_name.to_string());
95-
}
96-
97-
None
98-
}
99-
100-
fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
101-
let (scheme, path) =
102-
ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unrecognized uri {}", uri));
103-
104-
// object_store crate can recognize a bunch of different schemes and paths, but we only support
105-
// local, azure, and s3 schemes with a subset of all supported paths.
106-
match scheme {
107-
ObjectStoreScheme::AmazonS3 => {
108-
let bucket_name = parse_s3_bucket(uri).unwrap_or_else(|| {
109-
panic!("unsupported s3 uri: {}", uri);
110-
});
111-
112-
let storage_container = PG_BACKEND_TOKIO_RUNTIME
113-
.block_on(async { Arc::new(get_s3_object_store(&bucket_name).await) });
114-
115-
(storage_container, path)
116-
}
117-
ObjectStoreScheme::MicrosoftAzure => {
118-
let container_name = parse_azure_blob_container(uri).unwrap_or_else(|| {
119-
panic!("unsupported azure blob storage uri: {}", uri);
120-
});
121-
122-
let storage_container = PG_BACKEND_TOKIO_RUNTIME
123-
.block_on(async { Arc::new(get_azure_object_store(&container_name).await) });
124-
125-
(storage_container, path)
126-
}
127-
ObjectStoreScheme::Local => {
128-
let uri = uri_as_string(uri);
129-
130-
if !copy_from {
131-
// create or overwrite the local file
132-
std::fs::OpenOptions::new()
133-
.write(true)
134-
.truncate(true)
135-
.create(true)
136-
.open(&uri)
137-
.unwrap_or_else(|e| panic!("{}", e));
138-
}
139-
140-
let storage_container = Arc::new(LocalFileSystem::new());
141-
142-
let path = Path::from_filesystem_path(&uri).unwrap_or_else(|e| panic!("{}", e));
143-
144-
(storage_container, path)
145-
}
146-
_ => {
147-
panic!("unsupported scheme {} in uri {}", uri.scheme(), uri);
148-
}
149-
}
150-
}
151-
152-
// get_s3_object_store creates an AmazonS3 object store with the given bucket name.
153-
// It is configured by environment variables and aws config files as fallback method.
154-
// We need to read the config files to make the fallback method work since object_store
155-
// does not provide a way to read them. Currently, we only support to extract
156-
// "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", "AWS_ENDPOINT_URL",
157-
// and "AWS_REGION" from the config files.
158-
async fn get_s3_object_store(bucket_name: &str) -> AmazonS3 {
159-
let mut aws_s3_builder = AmazonS3Builder::from_env().with_bucket_name(bucket_name);
160-
161-
// first tries environment variables and then the config files
162-
let sdk_config = aws_config::defaults(BehaviorVersion::v2024_03_28())
163-
.load()
164-
.await;
165-
166-
if let Some(credential_provider) = sdk_config.credentials_provider() {
167-
if let Ok(credentials) = credential_provider.provide_credentials().await {
168-
// AWS_ACCESS_KEY_ID
169-
aws_s3_builder = aws_s3_builder.with_access_key_id(credentials.access_key_id());
170-
171-
// AWS_SECRET_ACCESS_KEY
172-
aws_s3_builder = aws_s3_builder.with_secret_access_key(credentials.secret_access_key());
173-
174-
if let Some(token) = credentials.session_token() {
175-
// AWS_SESSION_TOKEN
176-
aws_s3_builder = aws_s3_builder.with_token(token);
177-
}
178-
}
179-
}
180-
181-
// AWS_ENDPOINT_URL
182-
if let Some(aws_endpoint_url) = sdk_config.endpoint_url() {
183-
aws_s3_builder = aws_s3_builder.with_endpoint(aws_endpoint_url);
184-
}
185-
186-
// AWS_REGION
187-
if let Some(aws_region) = sdk_config.region() {
188-
aws_s3_builder = aws_s3_builder.with_region(aws_region.as_ref());
189-
}
190-
191-
aws_s3_builder.build().unwrap_or_else(|e| panic!("{}", e))
192-
}
193-
194-
async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure {
195-
let mut azure_builder = MicrosoftAzureBuilder::from_env().with_container_name(container_name);
196-
197-
// ~/.azure/config
198-
let azure_config_file_path = std::env::var("AZURE_CONFIG_FILE").unwrap_or(
199-
home_dir()
200-
.expect("failed to get home directory")
201-
.join(".azure")
202-
.join("config")
203-
.to_str()
204-
.expect("failed to convert path to string")
205-
.to_string(),
206-
);
207-
208-
let azure_config_content = Ini::load_from_file(&azure_config_file_path).ok();
209-
210-
// storage account
211-
let azure_blob_account = match std::env::var("AZURE_STORAGE_ACCOUNT") {
212-
Ok(account) => Some(account),
213-
Err(_) => azure_config_content
214-
.as_ref()
215-
.and_then(|ini| ini.section(Some("storage")))
216-
.and_then(|section| section.get("account"))
217-
.map(|account| account.to_string()),
218-
};
219-
220-
if let Some(azure_blob_account) = azure_blob_account {
221-
azure_builder = azure_builder.with_account(azure_blob_account);
222-
}
223-
224-
// storage key
225-
let azure_blob_key = match std::env::var("AZURE_STORAGE_KEY") {
226-
Ok(key) => Some(key),
227-
Err(_) => azure_config_content
228-
.as_ref()
229-
.and_then(|ini| ini.section(Some("storage")))
230-
.and_then(|section| section.get("key"))
231-
.map(|key| key.to_string()),
232-
};
233-
234-
if let Some(azure_blob_key) = azure_blob_key {
235-
azure_builder = azure_builder.with_access_key(azure_blob_key);
236-
}
237-
238-
// sas token
239-
let azure_blob_sas_token = match std::env::var("AZURE_STORAGE_SAS_TOKEN") {
240-
Ok(token) => Some(token),
241-
Err(_) => azure_config_content
242-
.as_ref()
243-
.and_then(|ini| ini.section(Some("storage")))
244-
.and_then(|section| section.get("sas_token"))
245-
.map(|token| token.to_string()),
246-
};
247-
248-
if let Some(azure_blob_sas_token) = azure_blob_sas_token {
249-
azure_builder = azure_builder.with_config(AzureConfigKey::SasKey, azure_blob_sas_token);
250-
}
251-
252-
azure_builder.build().unwrap_or_else(|e| panic!("{}", e))
253-
}
254-
25528
pub(crate) fn parse_uri(uri: &str) -> Url {
25629
if !uri.contains("://") {
25730
// local file
@@ -285,7 +58,7 @@ pub(crate) fn parquet_schema_from_uri(uri: &Url) -> SchemaDescriptor {
28558

28659
pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
28760
let copy_from = true;
288-
let (parquet_object_store, location) = object_store_with_location(uri, copy_from);
61+
let (parquet_object_store, location) = create_object_store(uri, copy_from);
28962

29063
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
29164
let object_store_meta = parquet_object_store
@@ -308,7 +81,7 @@ pub(crate) fn parquet_metadata_from_uri(uri: &Url) -> Arc<ParquetMetaData> {
30881

30982
pub(crate) fn parquet_reader_from_uri(uri: &Url) -> ParquetRecordBatchStream<ParquetObjectReader> {
31083
let copy_from = true;
311-
let (parquet_object_store, location) = object_store_with_location(uri, copy_from);
84+
let (parquet_object_store, location) = create_object_store(uri, copy_from);
31285

31386
PG_BACKEND_TOKIO_RUNTIME.block_on(async {
31487
let object_store_meta = parquet_object_store
@@ -340,7 +113,7 @@ pub(crate) fn parquet_writer_from_uri(
340113
writer_props: WriterProperties,
341114
) -> AsyncArrowWriter<ParquetObjectWriter> {
342115
let copy_from = false;
343-
let (parquet_object_store, location) = object_store_with_location(uri, copy_from);
116+
let (parquet_object_store, location) = create_object_store(uri, copy_from);
344117

345118
let parquet_object_writer = ParquetObjectWriter::new(parquet_object_store, location);
346119

src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
use std::sync::LazyLock;
2+
13
use parquet_copy_hook::hook::{init_parquet_copy_hook, ENABLE_PARQUET_COPY_HOOK};
24
use parquet_copy_hook::pg_compat::MarkGUCPrefixReserved;
35
use pgrx::{prelude::*, GucContext, GucFlags, GucRegistry};
6+
use tokio::runtime::Runtime;
47

58
mod arrow_parquet;
9+
mod object_store;
610
mod parquet_copy_hook;
711
mod parquet_udfs;
812
#[cfg(any(test, feature = "pg_test"))]
@@ -20,6 +24,15 @@ pgrx::pg_module_magic!();
2024

2125
extension_sql_file!("../sql/bootstrap.sql", name = "role_setup", bootstrap);
2226

27+
// PG_BACKEND_TOKIO_RUNTIME creates a tokio runtime that uses the current thread
28+
// to run the tokio reactor. This uses the same thread that is running the Postgres backend.
29+
pub(crate) static PG_BACKEND_TOKIO_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
30+
tokio::runtime::Builder::new_current_thread()
31+
.enable_all()
32+
.build()
33+
.unwrap_or_else(|e| panic!("failed to create tokio runtime: {}", e))
34+
});
35+
2336
#[pg_guard]
2437
pub extern "C" fn _PG_init() {
2538
GucRegistry::define_bool_guc(

0 commit comments

Comments
 (0)