Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds basic geoparquet support #94

Merged
merged 2 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ parquet = {version = "53", default-features = false, features = [
]}
pgrx = "=0.12.9"
rust-ini = "0.21"
serde = "1"
serde_json = "1"
tokio = {version = "1", default-features = false, features = ["rt", "time", "macros"]}
url = "2"

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (9) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (29 integral digits before decimal point, 9 digits after decimal point).
> - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files.
> - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files.
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB` when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
> - (5) `crunchy_map` is dependent on functionality provided by [Crunchy Bridge](https://www.crunchydata.com/products/crunchy-bridge). The `crunchy_map` type is represented as `GROUP` with `MAP` logical type when `crunchy_map` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.

> [!WARNING]
Expand Down
51 changes: 35 additions & 16 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use arrow_schema::SchemaRef;
use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
file::properties::{EnabledStatistics, WriterProperties},
format::KeyValue,
};
use pgrx::{heap_tuple::PgHeapTuple, pg_sys::RECORDOID, AllocatedByRust, PgTupleDesc};
use pgrx::{heap_tuple::PgHeapTuple, AllocatedByRust, PgTupleDesc};
use url::Url;

use crate::{
Expand All @@ -18,7 +19,10 @@ use crate::{
uri_utils::parquet_writer_from_uri,
},
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
type_compat::{geometry::reset_postgis_context, map::reset_map_context},
type_compat::{
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
map::reset_map_context,
},
PG_BACKEND_TOKIO_RUNTIME,
};

Expand All @@ -42,25 +46,11 @@ impl ParquetWriterContext {
compression_level: i32,
tupledesc: &PgTupleDesc,
) -> ParquetWriterContext {
debug_assert!(tupledesc.oid() == RECORDOID);

// Postgis and Map contexts are used throughout writing the parquet file.
// We need to reset them to avoid reading the stale data. (e.g. extension could be dropped)
reset_postgis_context();
reset_map_context();

let writer_props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_compression(
PgParquetCompressionWithLevel {
compression,
compression_level,
}
.into(),
)
.set_created_by("pg_parquet".to_string())
.build();

let attributes = collect_attributes_for(CollectAttributesFor::CopyTo, tupledesc);

pgrx::debug2!(
Expand All @@ -71,6 +61,8 @@ impl ParquetWriterContext {
let schema = parse_arrow_schema_from_attributes(&attributes);
let schema = Arc::new(schema);

let writer_props = Self::writer_props(tupledesc, compression, compression_level);

let parquet_writer = parquet_writer_from_uri(&uri, schema.clone(), writer_props);

let attribute_contexts =
Expand All @@ -83,6 +75,33 @@ impl ParquetWriterContext {
}
}

fn writer_props(
tupledesc: &PgTupleDesc,
compression: PgParquetCompression,
compression_level: i32,
) -> WriterProperties {
let compression = PgParquetCompressionWithLevel {
compression,
compression_level,
};

let mut writer_props_builder = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
.set_compression(compression.into())
.set_created_by("pg_parquet".to_string());

let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);

if geometry_columns_metadata_value.is_some() {
let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value);

writer_props_builder =
writer_props_builder.set_key_value_metadata(Some(vec![key_value_metadata]));
}

writer_props_builder.build()
}

pub(crate) fn write_new_row_group(
&mut self,
tuples: Vec<Option<PgHeapTuple<AllocatedByRust>>>,
Expand Down
117 changes: 115 additions & 2 deletions src/pgrx_tests/copy_type_roundtrip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ mod tests {
LOCAL_TEST_FILE_PATH,
};
use crate::type_compat::fallback_to_text::FallbackToText;
use crate::type_compat::geometry::Geometry;
use crate::type_compat::geometry::{
Geometry, GeometryColumnsMetadata, GeometryEncoding, GeometryType,
};
use crate::type_compat::map::Map;
use crate::type_compat::pg_arrow_type_conversions::{
DEFAULT_UNBOUNDED_NUMERIC_PRECISION, DEFAULT_UNBOUNDED_NUMERIC_SCALE,
};
use pgrx::pg_sys::Oid;
use pgrx::pg_test;
use pgrx::{
composite_type,
datum::{Date, Time, TimeWithTimeZone, Timestamp, TimestampWithTimeZone},
AnyNumeric, Spi,
};
use pgrx::{pg_test, JsonB};

#[pg_test]
fn test_int2() {
Expand Down Expand Up @@ -946,6 +948,117 @@ mod tests {
test_table.assert_expected_and_result_rows();
}

#[pg_test]
fn test_geometry_geoparquet_metadata() {
// Skip the test if postgis extension is not available
if !extension_exists("postgis") {
return;
}

let query = "DROP EXTENSION IF EXISTS postgis; CREATE EXTENSION postgis;";
Spi::run(query).unwrap();

let copy_to_query = format!(
"COPY (SELECT ST_GeomFromText('POINT(1 1)')::geometry(point) as a,
ST_GeomFromText('LINESTRING(0 0, 1 1)')::geometry(linestring) as b,
ST_GeomFromText('POLYGON((0 0, 1 1, 2 2, 0 0))')::geometry(polygon) as c,
ST_GeomFromText('MULTIPOINT((0 0), (1 1))')::geometry(multipoint) as d,
ST_GeomFromText('MULTILINESTRING((0 0, 1 1), (2 2, 3 3))')::geometry(multilinestring) as e,
ST_GeomFromText('MULTIPOLYGON(((0 0, 1 1, 2 2, 0 0)), ((3 3, 4 4, 5 5, 3 3)))')::geometry(multipolygon) as f,
ST_GeomFromText('GEOMETRYCOLLECTION(POINT(1 1), LINESTRING(0 0, 1 1))')::geometry(geometrycollection) as g
)
TO '{LOCAL_TEST_FILE_PATH}' WITH (format parquet);",
);
Spi::run(copy_to_query.as_str()).unwrap();

// Check geoparquet metadata
let geoparquet_metadata_query = format!(
"select encode(value, 'escape')::jsonb
from parquet.kv_metadata('{LOCAL_TEST_FILE_PATH}')
where encode(key, 'escape') = 'geo';",
);
let geoparquet_metadata_json = Spi::get_one::<JsonB>(geoparquet_metadata_query.as_str())
.unwrap()
.unwrap();

let geoparquet_metadata: GeometryColumnsMetadata =
serde_json::from_value(geoparquet_metadata_json.0).unwrap();

// assert common metadata
assert_eq!(geoparquet_metadata.version, "1.1.0");
assert_eq!(geoparquet_metadata.primary_column, "a");

// point
assert_eq!(
geoparquet_metadata.columns.get("a").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("a").unwrap().geometry_types,
vec![GeometryType::Point]
);

// linestring
assert_eq!(
geoparquet_metadata.columns.get("b").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("b").unwrap().geometry_types,
vec![GeometryType::LineString]
);

// polygon
assert_eq!(
geoparquet_metadata.columns.get("c").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("c").unwrap().geometry_types,
vec![GeometryType::Polygon]
);

// multipoint
assert_eq!(
geoparquet_metadata.columns.get("d").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("d").unwrap().geometry_types,
vec![GeometryType::MultiPoint]
);

// multilinestring
assert_eq!(
geoparquet_metadata.columns.get("e").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("e").unwrap().geometry_types,
vec![GeometryType::MultiLineString]
);

// multipolygon
assert_eq!(
geoparquet_metadata.columns.get("f").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("f").unwrap().geometry_types,
vec![GeometryType::MultiPolygon]
);

// geometrycollection
assert_eq!(
geoparquet_metadata.columns.get("g").unwrap().encoding,
GeometryEncoding::WKB
);
assert_eq!(
geoparquet_metadata.columns.get("g").unwrap().geometry_types,
vec![GeometryType::GeometryCollection]
);
}

#[pg_test]
fn test_complex_composite() {
Spi::run("CREATE TYPE dog AS (name text, age int);").unwrap();
Expand Down
Loading
Loading