Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Support for COPY TO/FROM Google Cloud Storage #61

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8553677
Adds support for COPY TO/FROM Azure Blob Storage
aykut-bozkurt Oct 23, 2024
f187bc4
Adds support for COPY TO/FROM Google Cloud Storage
aykut-bozkurt Oct 23, 2024
7664102
Merge branch 'main' into aykut/azure-blob-storage
aykut-bozkurt Dec 16, 2024
f1b7114
configure test endpoint via env var
aykut-bozkurt Dec 16, 2024
31f6701
merge
aykut-bozkurt Dec 16, 2024
4cb8eb1
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Dec 16, 2024
f3a31d0
no curl in fake-gcs
aykut-bozkurt Dec 16, 2024
0c65a3b
ci uri fix
aykut-bozkurt Dec 16, 2024
d76bf55
ci uri fix
aykut-bozkurt Dec 16, 2024
74df5b2
update readme
aykut-bozkurt Dec 16, 2024
841f5ec
Merge branch 'main' into aykut/azure-blob-storage
aykut-bozkurt Dec 24, 2024
a8df29d
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Dec 24, 2024
6102ba9
improve coverage
aykut-bozkurt Jan 3, 2025
3af0a78
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 3, 2025
24ff523
support connection string
aykut-bozkurt Jan 6, 2025
5f19d59
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 6, 2025
e0df256
support connection string
aykut-bozkurt Jan 6, 2025
dc181dd
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 6, 2025
e9d0cdd
support connection string
aykut-bozkurt Jan 6, 2025
88080dd
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 6, 2025
d5f5ef2
- support azure bearer token via client secret
aykut-bozkurt Jan 10, 2025
0016a3f
Merge branch 'aykut/azure-blob-storage' into aykut/google-cloud-storage
aykut-bozkurt Jan 10, 2025
5955d57
google config to pass to object store
aykut-bozkurt Jan 10, 2025
60103e7
Merge branch 'main' into aykut/google-cloud-storage
aykut-bozkurt Jan 10, 2025
dd3d157
fix ci for fake google storage server
aykut-bozkurt Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adds support for COPY TO/FROM Google Cloud Storage
Supports following Google Cloud Storage uri forms:
- gs:// \<bucket\> / \<path\>

**Configuration**

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
  "gcs_base_url": "gs://testbucket/test.parquet",
  "disable_oauth": false,
  "client_email": "...",
  "private_key_id": "...",
  "private_key": "..."
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file

Closes #62
  • Loading branch information
aykut-bozkurt committed Dec 3, 2024
commit f187bc41ef791cef822c65d9ef500f675a863c39
3 changes: 3 additions & 0 deletions .devcontainer/.env
Original file line number Diff line number Diff line change
@@ -14,6 +14,9 @@ AZURE_TEST_CONTAINER_NAME=testcontainer
AZURE_TEST_READ_ONLY_SAS="se=2100-05-05&sp=r&sv=2022-11-02&sr=c&sig=YMPFnAHKe9y0o3hFegncbwQTXtAyvsJEgPB2Ne1b9CQ%3D"
AZURE_TEST_READ_WRITE_SAS="se=2100-05-05&sp=rcw&sv=2022-11-02&sr=c&sig=TPz2jEz0t9L651t6rTCQr%2BOjmJHkM76tnCGdcyttnlA%3D"

# GCS tests
GOOGLE_TEST_BUCKET=testbucket

# Others
RUST_TEST_THREADS=1
PG_PARQUET_TEST=true
2 changes: 2 additions & 0 deletions .devcontainer/create-test-buckets.sh
Original file line number Diff line number Diff line change
@@ -3,3 +3,5 @@
aws --endpoint-url http://localhost:9000 s3 mb s3://$AWS_S3_TEST_BUCKET

az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"
15 changes: 15 additions & 0 deletions .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ services:
- ${USERPROFILE}${HOME}/.gitconfig:/home/rust/.gitconfig:ro
- ${USERPROFILE}${HOME}/.aws:/home/rust/.aws:rw
- ${USERPROFILE}${HOME}/.azure:/home/rust/.azure:rw
- ${USERPROFILE}${HOME}/.config/gcloud:/home/rust/.config/gcloud:rw

env_file:
- .env
@@ -20,6 +21,7 @@ services:
depends_on:
- minio
- azurite
- fake-gcs-server

minio:
image: minio/minio
@@ -45,3 +47,16 @@ services:
interval: 6s
timeout: 2s
retries: 3

fake-gcs-server:
image: tustvold/fake-gcs-server
env_file:
- .env
network_mode: host
command: -scheme http -public-host localhost:4443
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "http://localhost:4443"]
interval: 6s
timeout: 2s
retries: 3
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -132,6 +132,17 @@ jobs:

az storage container create -n $AZURE_TEST_CONTAINER_NAME --connection-string $AZURE_STORAGE_CONNECTION_STRING

- name: Start fake-gcs-server for Google Cloud Storage emulator tests
run: |
docker run -d --env-file .devcontainer/.env -p 4443:4443 tustvold/fake-gcs-server -scheme http -filesystem-root /tmp/gcs -public-host localhost:4443

while ! nc -z localhost 4443; do
echo "Waiting for localhost:4443..."
sleep 1
done

curl -v -X POST --data-binary "{\"name\":\"$GOOGLE_TEST_BUCKET\"}" -H "Content-Type: application/json" "http://localhost:4443/storage/v1/b"

- name: Run tests
run: |
# Run tests with coverage tool
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -5,6 +5,6 @@
"rust-analyzer.checkOnSave": true,
"editor.inlayHints.enabled": "offUnlessPressed",
"files.watcherExclude": {
"**/target/**": true
}
"**/target/**": true
}
}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ aws-config = { version = "1.5", default-features = false, features = ["rustls"]}
aws-credential-types = {version = "1.2", default-features = false}
futures = "0.3"
home = "0.5"
object_store = {version = "0.11", default-features = false, features = ["aws", "azure"]}
object_store = {version = "0.11", default-features = false, features = ["aws", "azure", "gcp"]}
once_cell = "1"
parquet = {version = "53", default-features = false, features = [
"arrow",
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -213,6 +213,28 @@ Supported Azure Blob Storage uri formats are shown below:
- azure:// \<container\> / \<path\>
- https:// \<account\>.blob.core.windows.net / \<container\>

#### Google Cloud Storage

The simplest way to configure object storage is by creating a json config file like [`/tmp/gcs.json`]:

```bash
$ cat /tmp/gcs.json
{
"gcs_base_url": "http://localhost:4443",
"disable_oauth": true,
"client_email": "",
"private_key_id": "",
"private_key": ""
}
```

Alternatively, you can use the following environment variables when starting postgres to configure the Google Cloud Storage client:
- `GOOGLE_SERVICE_ACCOUNT_KEY`: json serialized service account key
- `GOOGLE_SERVICE_ACCOUNT_PATH`: an alternative location for the config file

Supported Google Cloud Storage uri formats are shown below:
- gs:// \<bucket\> / \<path\>

## Copy Options
`pg_parquet` supports the following options in the `COPY TO` command:
- `format parquet`: you need to specify this option to read or write Parquet files which does not end with `.parquet[.<compression>]` extension,
52 changes: 50 additions & 2 deletions src/arrow_parquet/uri_utils.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use ini::Ini;
use object_store::{
aws::{AmazonS3, AmazonS3Builder},
azure::{AzureConfigKey, MicrosoftAzure, MicrosoftAzureBuilder},
gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder},
local::LocalFileSystem,
path::Path,
ObjectStore, ObjectStoreScheme,
@@ -96,6 +97,17 @@ fn parse_s3_bucket(uri: &Url) -> Option<String> {
None
}

fn parse_gcs_bucket(uri: &Url) -> Option<String> {
let host = uri.host_str()?;

// gs://{bucket}/key
if uri.scheme() == "gs" {
return Some(host.to_string());
}

None
}

fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStore>, Path) {
let (scheme, path) =
ObjectStoreScheme::parse(uri).unwrap_or_else(|_| panic!("unsupported uri {}", uri));
@@ -121,6 +133,16 @@ fn object_store_with_location(uri: &Url, copy_from: bool) -> (Arc<dyn ObjectStor

(storage_container, path)
}
ObjectStoreScheme::GoogleCloudStorage => {
let bucket_name = parse_gcs_bucket(uri).unwrap_or_else(|| {
panic!("failed to parse bucket name from uri: {}", uri);
});

let storage_container = PG_BACKEND_TOKIO_RUNTIME
.block_on(async { Arc::new(get_gcs_object_store(&bucket_name).await) });

(storage_container, path)
}
ObjectStoreScheme::Local => {
let uri = uri_as_string(uri);

@@ -262,6 +284,25 @@ async fn get_azure_object_store(container_name: &str) -> MicrosoftAzure {
azure_builder.build().unwrap_or_else(|e| panic!("{}", e))
}

async fn get_gcs_object_store(bucket_name: &str) -> GoogleCloudStorage {
let mut gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(bucket_name);

if is_testing() {
// use fake-gcp-server for testing
gcs_builder = gcs_builder.with_service_account_key(
"{
\"gcs_base_url\": \"http://localhost:4443\",
\"disable_oauth\": true,
\"client_email\": \"\",
\"private_key_id\": \"\",
\"private_key\": \"\"
}",
);
}

gcs_builder.build().unwrap_or_else(|e| panic!("{}", e))
}

fn is_testing() -> bool {
std::env::var("PG_PARQUET_TEST").is_ok()
}
@@ -284,13 +325,20 @@ pub(crate) fn parse_uri(uri: &str) -> Url {
} else if scheme == ObjectStoreScheme::MicrosoftAzure {
parse_azure_blob_container(&uri).unwrap_or_else(|| {
panic!(
"failed to parse container name from azure blob storage uri {}",
"failed to parse container name from Azure Blob Storage uri {}",
uri
)
});
} else if scheme == ObjectStoreScheme::GoogleCloudStorage {
parse_gcs_bucket(&uri).unwrap_or_else(|| {
panic!(
"failed to parse bucket name from Google Cloud Storage uri {}",
uri
)
});
} else {
panic!(
"unsupported uri {}. Only Azure and S3 uris are supported.",
"unsupported uri {}. Only Azure Blob Storage, S3 and Google Cloud Storage uris are supported.",
uri
);
};
45 changes: 41 additions & 4 deletions src/pgrx_tests/object_store.rs
Original file line number Diff line number Diff line change
@@ -338,7 +338,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
@@ -365,7 +365,7 @@ mod tests {
);

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);;",
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}' WITH (format parquet);",
azure_blob_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
@@ -411,10 +411,47 @@ mod tests {
}

#[pg_test]
#[should_panic(expected = "unsupported uri gs://testbucket")]
fn test_gcs_from_env() {
let test_bucket_name: String =
std::env::var("GOOGLE_TEST_BUCKET").expect("GOOGLE_TEST_BUCKET not found");

let gcs_uri = format!("gs://{}/pg_parquet_test.parquet", test_bucket_name);

let test_table = TestTable::<i32>::new("int4".into()).with_uri(gcs_uri);

test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_write_wrong_bucket() {
let s3_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let copy_to_command = format!(
"COPY (SELECT i FROM generate_series(1,10) i) TO '{}';",
s3_uri
);
Spi::run(copy_to_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "404 Not Found")]
fn test_gcs_read_wrong_bucket() {
let gcs_uri = "gs://randombucketwhichdoesnotexist/pg_parquet_test.parquet";

let create_table_command = "CREATE TABLE test_table (a int);";
Spi::run(create_table_command).unwrap();

let copy_from_command = format!("COPY test_table FROM '{}';", gcs_uri);
Spi::run(copy_from_command.as_str()).unwrap();
}

#[pg_test]
#[should_panic(expected = "unsupported uri http://testbucket")]
fn test_unsupported_uri() {
let test_table =
TestTable::<i32>::new("int4".into()).with_uri("gs://testbucket".to_string());
TestTable::<i32>::new("int4".into()).with_uri("http://testbucket".to_string());
test_table.insert("INSERT INTO test_expected (a) VALUES (1), (2), (null);");
test_table.assert_expected_and_result_rows();
}