Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## 0.6.2
- Only create header strings once on the rust side
- Re-add nanosecond time support
- Fix regression with UUID parsing
- Fix regression with complex type parsing

## 0.6.1
- Fix regression, handle symbol keys in schema definition when constructing writer
Expand All @@ -10,6 +13,8 @@
- Complete rewrite of the underlying rust code
- Refactored to separate the parquet writing / reading code separate from the Ruby interop
- External API remains the same
- Timezone handling for timestamps is now spec compliant
- Dates are returned as `Date` objects

## 0.5.13
- Get rid of assertion preventing reading maps with complex key types
Expand Down
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ext/parquet-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
arrow = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" }
arrow-array = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" }
arrow-buffer = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" }
arrow-schema = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" }
arrow-schema = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader", features = ["canonical_extension_types"]}
bytes = "1.5"
indexmap = "2.2"
jiff = "0.2"
Expand All @@ -17,7 +17,7 @@ parquet = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24
rand = "0.9.1"
serde = { version = "1.0", features = ["derive"] }
thiserror = "2.0"
uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
uuid = { version = "1.0", features = ["v4"] }
tempfile = "3.8"
46 changes: 35 additions & 11 deletions ext/parquet-core/src/arrow_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@

use crate::{ParquetError, ParquetValue, Result};
use arrow_array::{builder::*, Array, ArrayRef, ListArray, MapArray, StructArray};
use arrow_schema::extension::Uuid as ArrowUuid;
use arrow_schema::{DataType, Field};
use bytes::Bytes;
use indexmap::IndexMap;
use ordered_float::OrderedFloat;
use std::sync::Arc;

/// Convert a single value from an Arrow array at the given index to a ParquetValue
pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<ParquetValue> {
pub fn arrow_to_parquet_value(
field: &Field,
array: &dyn Array,
index: usize,
) -> Result<ParquetValue> {
use arrow_array::*;

if array.is_null(index) {
Expand Down Expand Up @@ -72,7 +77,6 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<Parquet
let array = downcast_array::<Float64Array>(array)?;
Ok(ParquetValue::Float64(OrderedFloat(array.value(index))))
}

// String and binary types
DataType::Utf8 => {
let array = downcast_array::<StringArray>(array)?;
Expand All @@ -86,9 +90,15 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<Parquet
}
DataType::FixedSizeBinary(_) => {
let array = downcast_array::<FixedSizeBinaryArray>(array)?;
Ok(ParquetValue::Bytes(Bytes::copy_from_slice(
array.value(index),
)))
let value = array.value(index);
match field.try_extension_type::<ArrowUuid>() {
Ok(_) => {
let uuid = uuid::Uuid::from_slice(value)
.map_err(|e| ParquetError::Conversion(format!("Invalid UUID: {}", e)))?;
Ok(ParquetValue::Uuid(uuid))
}
Err(_) => Ok(ParquetValue::Bytes(Bytes::copy_from_slice(value))),
}
}

// Date and time types
Expand Down Expand Up @@ -140,6 +150,10 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<Parquet
let array = downcast_array::<Time64MicrosecondArray>(array)?;
Ok(ParquetValue::TimeMicros(array.value(index)))
}
arrow_schema::TimeUnit::Nanosecond => {
let array = downcast_array::<Time64NanosecondArray>(array)?;
Ok(ParquetValue::TimeNanos(array.value(index)))
}
_ => Err(ParquetError::Conversion(format!(
"Unsupported time64 unit: {:?}",
unit
Expand Down Expand Up @@ -173,13 +187,13 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<Parquet
}

// Complex types
DataType::List(_) => {
DataType::List(item_field) => {
let array = downcast_array::<ListArray>(array)?;
let list_values = array.value(index);

let mut values = Vec::with_capacity(list_values.len());
for i in 0..list_values.len() {
values.push(arrow_to_parquet_value(&list_values, i)?);
values.push(arrow_to_parquet_value(item_field, &list_values, i)?);
}

Ok(ParquetValue::List(values))
Expand All @@ -192,10 +206,20 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<Parquet
let keys = map_value.column(0);
let values = map_value.column(1);

let key_field = map_value
.fields()
.iter().find(|f| f.name() == "key")
.ok_or_else(|| ParquetError::Conversion("No value field found".to_string()))?;

let value_field = map_value
.fields()
.iter().find(|f| f.name() == "value")
.ok_or_else(|| ParquetError::Conversion("No value field found".to_string()))?;

let mut map_vec = Vec::with_capacity(keys.len());
for i in 0..keys.len() {
let key = arrow_to_parquet_value(keys, i)?;
let value = arrow_to_parquet_value(values, i)?;
let key = arrow_to_parquet_value(key_field, keys, i)?;
let value = arrow_to_parquet_value(value_field, values, i)?;
map_vec.push((key, value));
}

Expand All @@ -207,7 +231,7 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result<Parquet
let mut map = IndexMap::new();
for (col_idx, field) in array.fields().iter().enumerate() {
let column = array.column(col_idx);
let value = arrow_to_parquet_value(column, index)?;
let value = arrow_to_parquet_value(field, column, index)?;
map.insert(Arc::from(field.name().as_str()), value);
}

Expand Down Expand Up @@ -1108,7 +1132,7 @@ mod tests {
let array = parquet_values_to_arrow_array(values.clone(), &field).unwrap();

for (i, expected) in values.iter().enumerate() {
let actual = arrow_to_parquet_value(array.as_ref(), i).unwrap();
let actual = arrow_to_parquet_value(&field, array.as_ref(), i).unwrap();
assert_eq!(&actual, expected);
}
}
Expand Down
11 changes: 7 additions & 4 deletions ext/parquet-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ where
// Extract values from current row
let mut row_values = Vec::with_capacity(batch.num_columns());

for column in batch.columns() {
let value = match arrow_to_parquet_value(column, self.current_row) {
let schema = batch.schema();
for (i, column) in batch.columns().iter().enumerate() {
let field = schema.field(i);
let value = match arrow_to_parquet_value(field, column, self.current_row) {
Ok(v) => v,
Err(e) => return Some(Err(e)),
};
Expand Down Expand Up @@ -228,12 +230,13 @@ where
let mut columns = Vec::with_capacity(batch.num_columns());

for (idx, column) in batch.columns().iter().enumerate() {
let column_name = self.schema.field(idx).name().to_string();
let field = self.schema.field(idx);
let column_name = field.name().to_string();

// Convert entire column to ParquetValues
let mut values = Vec::with_capacity(column.len());
for row_idx in 0..column.len() {
match arrow_to_parquet_value(column, row_idx) {
match arrow_to_parquet_value(field, column, row_idx) {
Ok(value) => values.push(value),
Err(e) => return Some(Err(e)),
}
Expand Down
2 changes: 2 additions & 0 deletions ext/parquet-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub enum PrimitiveType {
TimestampNanos(Option<Arc<str>>),
TimeMillis,
TimeMicros,
TimeNanos,

// Fixed-length byte array
FixedLenByteArray(i32),
Expand Down Expand Up @@ -146,6 +147,7 @@ impl PrimitiveType {
PrimitiveType::TimestampNanos(_) => "TimestampNanos",
PrimitiveType::TimeMillis => "TimeMillis",
PrimitiveType::TimeMicros => "TimeMicros",
PrimitiveType::TimeNanos => "TimeNanos",
PrimitiveType::FixedLenByteArray(_) => "FixedLenByteArray",
}
}
Expand Down
7 changes: 7 additions & 0 deletions ext/parquet-core/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bytes::Bytes;
use indexmap::IndexMap;
use num::BigInt;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ParquetValue {
Expand All @@ -22,6 +23,7 @@ pub enum ParquetValue {
Boolean(bool),
String(Arc<str>),
Bytes(Bytes),
Uuid(Uuid),

// Date/Time types
Date32(i32), // Days since epoch
Expand All @@ -40,6 +42,7 @@ pub enum ParquetValue {
// Time types
TimeMillis(i32), // Time of day in milliseconds since midnight
TimeMicros(i64), // Time of day in microseconds since midnight
TimeNanos(i64), // Time of day in nanoseconds since midnight

// Complex types
List(Vec<ParquetValue>),
Expand Down Expand Up @@ -68,6 +71,7 @@ impl std::hash::Hash for ParquetValue {
ParquetValue::Boolean(b) => b.hash(state),
ParquetValue::String(s) => s.hash(state),
ParquetValue::Bytes(b) => b.hash(state),
ParquetValue::Uuid(u) => u.hash(state),
ParquetValue::Date32(d) => d.hash(state),
ParquetValue::Date64(d) => d.hash(state),
ParquetValue::Decimal128(d, scale) => {
Expand Down Expand Up @@ -96,6 +100,7 @@ impl std::hash::Hash for ParquetValue {
}
ParquetValue::TimeMillis(t) => t.hash(state),
ParquetValue::TimeMicros(t) => t.hash(state),
ParquetValue::TimeNanos(t) => t.hash(state),
ParquetValue::List(l) => l.hash(state),
ParquetValue::Map(m) => m.hash(state),
ParquetValue::Record(r) => {
Expand Down Expand Up @@ -133,6 +138,7 @@ impl ParquetValue {
ParquetValue::Boolean(_) => "Boolean",
ParquetValue::String(_) => "String",
ParquetValue::Bytes(_) => "Bytes",
ParquetValue::Uuid(_) => "Uuid",
ParquetValue::Date32(_) => "Date32",
ParquetValue::Date64(_) => "Date64",
ParquetValue::Decimal128(_, _) => "Decimal128",
Expand All @@ -143,6 +149,7 @@ impl ParquetValue {
ParquetValue::TimestampNanos(_, _) => "TimestampNanos",
ParquetValue::TimeMillis(_) => "TimeMillis",
ParquetValue::TimeMicros(_) => "TimeMicros",
ParquetValue::TimeNanos(_) => "TimeNanos",
ParquetValue::List(_) => "List",
ParquetValue::Map(_) => "Map",
ParquetValue::Record(_) => "Record",
Expand Down
17 changes: 14 additions & 3 deletions ext/parquet-core/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ where
(Date64(_), DataType::Date64) => 8,
(TimeMillis(_), DataType::Time32(_)) => 4,
(TimeMicros(_), DataType::Time64(_)) => 8,
(TimeNanos(_), DataType::Time64(_)) => 8,
(TimestampSecond(_, _), DataType::Timestamp(_, _)) => 8,
(TimestampMillis(_, _), DataType::Timestamp(_, _)) => 8,
(TimestampMicros(_, _), DataType::Timestamp(_, _)) => 8,
Expand Down Expand Up @@ -364,7 +365,9 @@ where
writer.write(&batch)?;

// Check if we need to flush based on memory usage
if writer.in_progress_size() >= self.memory_threshold {
if writer.in_progress_size() >= self.memory_threshold
|| writer.memory_size() >= self.memory_threshold
{
writer.flush()?;
}
} else {
Expand Down Expand Up @@ -496,6 +499,7 @@ fn validate_value_against_field(value: &ParquetValue, field: &Field, path: &str)
(Date64(_), DataType::Date64) => Ok(()),
(TimeMillis(_), DataType::Time32(_)) => Ok(()),
(TimeMicros(_), DataType::Time64(_)) => Ok(()),
(TimeNanos(_), DataType::Time64(_)) => Ok(()),
(TimestampSecond(_, _), DataType::Timestamp(_, _)) => Ok(()),
(TimestampMillis(_, _), DataType::Timestamp(_, _)) => Ok(()),
(TimestampMicros(_, _), DataType::Timestamp(_, _)) => Ok(()),
Expand Down Expand Up @@ -591,10 +595,16 @@ fn schema_node_to_arrow_field(node: &SchemaNode) -> Result<Field> {
name,
primitive_type,
nullable,
..
format,
} => {
let data_type = primitive_type_to_arrow(primitive_type)?;
Ok(Field::new(name, data_type, *nullable))
let field = Field::new(name, data_type, *nullable);
let extended_field = if format.as_deref() == Some("uuid") {
field.with_extension_type(arrow_schema::extension::Uuid)
} else {
field
};
Ok(extended_field)
}
SchemaNode::List {
name,
Expand Down Expand Up @@ -671,6 +681,7 @@ fn primitive_type_to_arrow(ptype: &crate::PrimitiveType) -> Result<DataType> {
Date32 => DataType::Date32,
TimeMillis => DataType::Time32(arrow_schema::TimeUnit::Millisecond),
TimeMicros => DataType::Time64(arrow_schema::TimeUnit::Microsecond),
TimeNanos => DataType::Time64(arrow_schema::TimeUnit::Nanosecond),
TimestampMillis(tz) => DataType::Timestamp(
arrow_schema::TimeUnit::Millisecond,
// PARQUET SPEC: ANY timezone (e.g., "+09:00", "America/New_York") means
Expand Down
8 changes: 4 additions & 4 deletions ext/parquet-core/tests/arrow_conversion_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn test_decimal256_large_values() {

// Verify roundtrip
for i in 0..4 {
let value = arrow_to_parquet_value(array.as_ref(), i).unwrap();
let value = arrow_to_parquet_value(&field, array.as_ref(), i).unwrap();
match (i, value) {
(0, ParquetValue::Decimal256(v, _)) => assert_eq!(v, large_positive.clone()),
(1, ParquetValue::Decimal256(v, _)) => assert_eq!(v, large_negative.clone()),
Expand Down Expand Up @@ -173,7 +173,7 @@ fn test_timestamp_with_timezone() {

// Verify roundtrip preserves timezone
for i in 0..3 {
let value = arrow_to_parquet_value(array.as_ref(), i).unwrap();
let value = arrow_to_parquet_value(&field, array.as_ref(), i).unwrap();
match value {
ParquetValue::TimestampMillis(_, Some(tz)) => {
assert_eq!(tz.as_ref(), "America/New_York");
Expand Down Expand Up @@ -209,7 +209,7 @@ fn test_nested_list_of_lists() {
assert_eq!(array.len(), 1);

// Verify roundtrip
let value = arrow_to_parquet_value(array.as_ref(), 0).unwrap();
let value = arrow_to_parquet_value(&outer_field, array.as_ref(), 0).unwrap();
match value {
ParquetValue::List(items) => assert_eq!(items.len(), 5),
_ => panic!("Expected list"),
Expand Down Expand Up @@ -357,7 +357,7 @@ fn test_unsupported_arrow_types() {
)
.unwrap();

let result = arrow_to_parquet_value(&array, 0);
let result = arrow_to_parquet_value(&Field::new("int", DataType::Int32, false), &array, 0);
assert!(result.is_err());
assert!(result
.unwrap_err()
Expand Down
1 change: 1 addition & 0 deletions ext/parquet-ruby-adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ rb-sys = { version = "0.9", features = ["stable-api-compiled-fallback"] }
tempfile = "^3.15"
thiserror = "2.0"
indexmap = "2.2"
uuid = "*"
Loading
Loading