Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
| `time` | INT64 | TIME_MICROS |
| `timetz`(3) | INT64 | TIME_MICROS |
| `geometry`(4) | BYTE_ARRAY | |
| `geography`(4) | BYTE_ARRAY | |

### Nested Types
| PostgreSQL Type | Parquet Physical Type | Logical Type |
Expand All @@ -440,7 +441,7 @@ There is currently only one GUC parameter to enable/disable the `pg_parquet`:
> * `numeric` is allowed by Postgres. (precision and scale not specified). These are represented by a default precision (38) and scale (9) instead of writing them as string. You get runtime error if your table tries to read or write a numeric value which is not allowed by the default precision and scale (29 integral digits before decimal point, 9 digits after decimal point).
> - (2) The `date` type is represented according to `Unix epoch` when writing to Parquet files. It is converted back according to `PostgreSQL epoch` when reading from Parquet files.
> - (3) The `timestamptz` and `timetz` types are adjusted to `UTC` when writing to Parquet files. They are converted back with `UTC` timezone when reading from Parquet files.
> - (4) The `geometry` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.
> - (4) The `geometry` and `geography` type is represented as `BYTE_ARRAY` encoded as `WKB`, specified by [geoparquet spec](https://geoparquet.org/releases/v1.1.0/), when `postgis` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type. Orientation and edges metadata are written when `postgis_sfcgal` extension is created.
> - (5) `crunchy_map` is dependent on functionality provided by [Crunchy Bridge](https://www.crunchydata.com/products/crunchy-bridge). The `crunchy_map` type is represented as `GROUP` with `MAP` logical type when `crunchy_map` extension is created. Otherwise, it is represented as `BYTE_ARRAY` with `STRING` logical type.

> [!WARNING]
Expand Down
12 changes: 11 additions & 1 deletion src/arrow_parquet/arrow_to_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
},
type_compat::{
fallback_to_text::{reset_fallback_to_text_context, FallbackToText},
geometry::{is_postgis_geometry_type, Geometry},
geometry::{Geography, Geometry},
map::{is_map_type, reset_map_type_context, Map},
},
};
Expand All @@ -34,6 +34,7 @@ pub(crate) mod date;
pub(crate) mod fallback_to_text;
pub(crate) mod float4;
pub(crate) mod float8;
pub(crate) mod geography;
pub(crate) mod geometry;
pub(crate) mod int2;
pub(crate) mod int4;
Expand Down Expand Up @@ -127,6 +128,8 @@ fn to_pg_nonarray_datum(
DataType::Binary => {
if attribute_context.is_geometry() {
to_pg_datum!(BinaryArray, Geometry, primitive_array, attribute_context)
} else if attribute_context.is_geography() {
to_pg_datum!(BinaryArray, Geography, primitive_array, attribute_context)
} else {
to_pg_datum!(BinaryArray, Vec<u8>, primitive_array, attribute_context)
}
Expand Down Expand Up @@ -271,6 +274,13 @@ fn to_pg_array_datum(
list_array,
element_context
)
} else if element_context.is_geography() {
to_pg_datum!(
BinaryArray,
Vec<Option<Geography>>,
list_array,
element_context
)
} else {
to_pg_datum!(
BinaryArray,
Expand Down
18 changes: 16 additions & 2 deletions src/arrow_parquet/arrow_to_pg/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ use pgrx::{
PgTupleDesc,
};

use crate::type_compat::pg_arrow_type_conversions::extract_precision_and_scale_from_numeric_typmod;
use crate::type_compat::{
geometry::{is_postgis_geography_type, is_postgis_geometry_type},
pg_arrow_type_conversions::extract_precision_and_scale_from_numeric_typmod,
};

use super::{
array_element_typoid, collect_attributes_for, domain_array_base_elem_type, is_array_type,
is_composite_type, is_map_type, is_postgis_geometry_type, tuple_desc, CollectAttributesFor,
is_composite_type, is_map_type, tuple_desc, CollectAttributesFor,
};

// ArrowToPgAttributeContext contains the information needed to convert an Arrow array
Expand Down Expand Up @@ -100,6 +103,7 @@ impl ArrowToPgAttributeContext {
pub(crate) enum ArrowToPgAttributeTypeContext {
Primitive {
is_geometry: bool,
is_geography: bool,
precision: Option<u32>,
scale: Option<u32>,
timezone: Option<String>,
Expand Down Expand Up @@ -144,13 +148,16 @@ impl ArrowToPgAttributeTypeContext {

let is_geometry = is_postgis_geometry_type(typoid);

let is_geography = is_postgis_geography_type(typoid);

let timezone = match &data_type {
DataType::Timestamp(_, Some(timezone)) => Some(timezone.to_string()),
_ => None,
};

Self::Primitive {
is_geometry,
is_geography,
precision,
scale,
timezone,
Expand Down Expand Up @@ -278,6 +285,13 @@ impl ArrowToPgAttributeTypeContext {
_ => false,
}
}

pub(crate) fn is_geography(&self) -> bool {
match &self {
ArrowToPgAttributeTypeContext::Primitive { is_geography, .. } => *is_geography,
_ => false,
}
}
}

pub(crate) fn collect_arrow_to_pg_attribute_contexts(
Expand Down
32 changes: 32 additions & 0 deletions src/arrow_parquet/arrow_to_pg/geography.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use arrow::array::{Array, BinaryArray};

use crate::type_compat::geometry::Geography;

use super::{ArrowArrayToPgType, ArrowToPgAttributeContext};

// Geography
impl ArrowArrayToPgType<Geography> for BinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Geography> {
if self.is_null(0) {
None
} else {
Some(self.value(0).to_vec().into())
}
}
}

// Geography[]
impl ArrowArrayToPgType<Vec<Option<Geography>>> for BinaryArray {
fn to_pg_type(self, _context: &ArrowToPgAttributeContext) -> Option<Vec<Option<Geography>>> {
let mut vals = vec![];
for val in self.iter() {
if let Some(val) = val {
vals.push(Some(val.to_vec().into()));
} else {
vals.push(None);
}
}

Some(vals)
}
}
62 changes: 41 additions & 21 deletions src/arrow_parquet/parquet_writer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{cell::RefCell, rc::Rc, sync::Arc};

use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
Expand All @@ -24,7 +24,7 @@ use crate::{
parquet_copy_hook::copy_to_split_dest_receiver::CopyToParquetOptions,
pgrx_utils::{collect_attributes_for, CollectAttributesFor},
type_compat::{
geometry::{geoparquet_metadata_json_from_tupledesc, reset_postgis_context},
geometry::{geoparquet_metadata_to_json, reset_postgis_context, GeoparquetMetadata},
map::reset_map_context,
},
PG_BACKEND_TOKIO_RUNTIME,
Expand All @@ -43,6 +43,7 @@ pub(crate) struct ParquetWriterContext {
parquet_writer: AsyncArrowWriter<ParquetObjectWriter>,
schema: SchemaRef,
attribute_contexts: Vec<PgToArrowAttributeContext>,
geoparquet_metadata: Rc<RefCell<GeoparquetMetadata>>,
options: CopyToParquetOptions,
}

Expand Down Expand Up @@ -71,44 +72,41 @@ impl ParquetWriterContext {

let schema = Arc::new(schema);

let writer_props = Self::writer_props(tupledesc, options);
let writer_props = Self::writer_props(options);

let parquet_writer = parquet_writer_from_uri(&uri_info, schema.clone(), writer_props);

let attribute_contexts =
collect_pg_to_arrow_attribute_contexts(&attributes, &schema.fields);
let geoparquet_metadata = Rc::new(RefCell::new(GeoparquetMetadata::new()));

let attribute_contexts = collect_pg_to_arrow_attribute_contexts(
&attributes,
&schema.fields,
geoparquet_metadata.clone(),
);

ParquetWriterContext {
parquet_writer,
schema,
attribute_contexts,
geoparquet_metadata,
options,
}
}

fn writer_props(tupledesc: &PgTupleDesc, options: CopyToParquetOptions) -> WriterProperties {
fn writer_props(options: CopyToParquetOptions) -> WriterProperties {
let compression = PgParquetCompressionWithLevel {
compression: options.compression,
compression_level: options.compression_level,
};

let mut writer_props_builder = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Page)
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.set_offset_index_disabled(true) // use page index instead of data page statistics
.set_compression(compression.into())
.set_max_row_group_size(options.row_group_size as usize)
.set_created_by("pg_parquet".to_string())
.set_writer_version(options.parquet_version.into())
.set_created_by("pg_parquet".to_string());

let geometry_columns_metadata_value = geoparquet_metadata_json_from_tupledesc(tupledesc);

if geometry_columns_metadata_value.is_some() {
let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value);

writer_props_builder =
writer_props_builder.set_key_value_metadata(Some(vec![key_value_metadata]));
}

writer_props_builder.build()
.build()
}

// write_tuples writes the tuples to the parquet file. It flushes the in progress rows to a new row group
Expand All @@ -132,10 +130,32 @@ impl ParquetWriterContext {
}
}

fn write_geoparquet_metadata_if_exists(&mut self) {
let geoparquet_metadata = self.geoparquet_metadata.borrow_mut();

let has_geoparquet_columns = !geoparquet_metadata.columns.is_empty();

if !has_geoparquet_columns {
// No geoparquet columns to write, so we skip writing metadata.
return;
}

let geometry_columns_metadata_value = geoparquet_metadata_to_json(&geoparquet_metadata);

let key_value_metadata = KeyValue::new("geo".into(), geometry_columns_metadata_value);

self.parquet_writer
.append_key_value_metadata(key_value_metadata);
}

// finalize flushes the in progress rows to a new row group and finally writes metadata to the file.
pub(crate) fn finalize(&mut self) {
PG_BACKEND_TOKIO_RUNTIME
.block_on(self.parquet_writer.finish())
.block_on(async {
self.write_geoparquet_metadata_if_exists();

self.parquet_writer.finish().await
})
.unwrap_or_else(|e| panic!("failed to finish parquet writer: {e}"));
}

Expand Down
7 changes: 6 additions & 1 deletion src/arrow_parquet/pg_to_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
},
type_compat::{
fallback_to_text::{reset_fallback_to_text_context, FallbackToText},
geometry::{is_postgis_geometry_type, Geometry},
geometry::{Geography, Geometry},
map::{is_map_type, reset_map_type_context, Map},
pg_arrow_type_conversions::{
extract_precision_and_scale_from_numeric_typmod, should_write_numeric_as_text,
Expand All @@ -38,6 +38,7 @@ pub(crate) mod date;
pub(crate) mod fallback_to_text;
pub(crate) mod float4;
pub(crate) mod float8;
pub(crate) mod geography;
pub(crate) mod geometry;
pub(crate) mod int2;
pub(crate) mod int4;
Expand Down Expand Up @@ -205,6 +206,8 @@ fn to_arrow_primitive_array(
to_arrow_primitive_array!(Map, tuples, attribute_context)
} else if attribute_context.is_geometry() {
to_arrow_primitive_array!(Geometry, tuples, attribute_context)
} else if attribute_context.is_geography() {
to_arrow_primitive_array!(Geography, tuples, attribute_context)
} else {
reset_fallback_to_text_context(
attribute_context.typoid(),
Expand Down Expand Up @@ -314,6 +317,8 @@ fn to_arrow_list_array(
to_arrow_list_array!(pgrx::Array<Map>, tuples, element_context)
} else if element_context.is_geometry() {
to_arrow_list_array!(pgrx::Array<Geometry>, tuples, element_context)
} else if element_context.is_geography() {
to_arrow_list_array!(pgrx::Array<Geography>, tuples, element_context)
} else {
reset_fallback_to_text_context(element_typoid, element_typmod);

Expand Down
Loading