diff --git a/CHANGELOG.md b/CHANGELOG.md index 12bd1d9..1871115 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.6.2 - Only create header strings once on the rust side +- Re-add nanosecond time support +- Fix regression with UUID parsing +- Fix regression with complex type parsing ## 0.6.1 - Fix regression, handle symbol keys in schema definition when constructing writer @@ -10,6 +13,8 @@ - Complete rewrite of the underlying rust code - Refactored to separate the parquet writing / reading code separate from the Ruby interop - External API remains the same +- Timezone handling for timestamps is now spec compliant +- Dates are returned as `Date` objects ## 0.5.13 - Get rid of assertion preventing reading maps with complex key types diff --git a/Cargo.lock b/Cargo.lock index 0df58ec..3afa2ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,10 @@ dependencies = [ name = "arrow-schema" version = "55.2.0" source = "git+https://github.com/njaremko/arrow-rs?branch=nathan_06-24-remove_primitive_map_key_assertion_on_record_reader#54858bf019ff3faeb8f5b562da8c01012162aef0" +dependencies = [ + "serde", + "serde_json", +] [[package]] name = "arrow-select" @@ -1092,6 +1096,7 @@ dependencies = [ "rb-sys-env 0.2.2", "tempfile", "thiserror", + "uuid", ] [[package]] diff --git a/ext/parquet-core/Cargo.toml b/ext/parquet-core/Cargo.toml index 60a5341..1e902fb 100644 --- a/ext/parquet-core/Cargo.toml +++ b/ext/parquet-core/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" arrow = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" } arrow-array = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" } arrow-buffer = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" } -arrow-schema = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader" } +arrow-schema = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24-remove_primitive_map_key_assertion_on_record_reader", features = ["canonical_extension_types"]} bytes = "1.5" indexmap = "2.2" jiff = "0.2" @@ -17,7 +17,7 @@ parquet = { git = "https://github.com/njaremko/arrow-rs", branch = "nathan_06-24 rand = "0.9.1" serde = { version = "1.0", features = ["derive"] } thiserror = "2.0" +uuid = { version = "1.0", features = ["v4"] } [dev-dependencies] -uuid = { version = "1.0", features = ["v4"] } tempfile = "3.8" diff --git a/ext/parquet-core/src/arrow_conversion.rs b/ext/parquet-core/src/arrow_conversion.rs index 5a2a6da..72aa45c 100644 --- a/ext/parquet-core/src/arrow_conversion.rs +++ b/ext/parquet-core/src/arrow_conversion.rs @@ -7,6 +7,7 @@ use crate::{ParquetError, ParquetValue, Result}; use arrow_array::{builder::*, Array, ArrayRef, ListArray, MapArray, StructArray}; +use arrow_schema::extension::Uuid as ArrowUuid; use arrow_schema::{DataType, Field}; use bytes::Bytes; use indexmap::IndexMap; @@ -14,7 +15,11 @@ use ordered_float::OrderedFloat; use std::sync::Arc; /// Convert a single value from an Arrow array at the given index to a ParquetValue -pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result { +pub fn arrow_to_parquet_value( + field: &Field, + array: &dyn Array, + index: usize, +) -> Result { use arrow_array::*; if array.is_null(index) { @@ -72,7 +77,6 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result(array)?; Ok(ParquetValue::Float64(OrderedFloat(array.value(index)))) } - // String and binary types DataType::Utf8 => { let array = downcast_array::(array)?; @@ -86,9 +90,15 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result { let array = downcast_array::(array)?; - Ok(ParquetValue::Bytes(Bytes::copy_from_slice( - array.value(index), - ))) + let value = array.value(index); + match field.try_extension_type::() { + Ok(_) => { + let uuid = uuid::Uuid::from_slice(value) + .map_err(|e| ParquetError::Conversion(format!("Invalid UUID: {}", e)))?; + Ok(ParquetValue::Uuid(uuid)) + } + Err(_) => Ok(ParquetValue::Bytes(Bytes::copy_from_slice(value))), + } } // Date and time types @@ -140,6 +150,10 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result(array)?; Ok(ParquetValue::TimeMicros(array.value(index))) } + arrow_schema::TimeUnit::Nanosecond => { + let array = downcast_array::(array)?; + Ok(ParquetValue::TimeNanos(array.value(index))) + } _ => Err(ParquetError::Conversion(format!( "Unsupported time64 unit: {:?}", unit @@ -173,13 +187,13 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result { + DataType::List(item_field) => { let array = downcast_array::(array)?; let list_values = array.value(index); let mut values = Vec::with_capacity(list_values.len()); for i in 0..list_values.len() { - values.push(arrow_to_parquet_value(&list_values, i)?); + values.push(arrow_to_parquet_value(item_field, &list_values, i)?); } Ok(ParquetValue::List(values)) @@ -192,10 +206,20 @@ pub fn arrow_to_parquet_value(array: &dyn Array, index: usize) -> Result Result v, Err(e) => return Some(Err(e)), }; @@ -228,12 +230,13 @@ where let mut columns = Vec::with_capacity(batch.num_columns()); for (idx, column) in batch.columns().iter().enumerate() { - let column_name = self.schema.field(idx).name().to_string(); + let field = self.schema.field(idx); + let column_name = field.name().to_string(); // Convert entire column to ParquetValues let mut values = Vec::with_capacity(column.len()); for row_idx in 0..column.len() { - match arrow_to_parquet_value(column, row_idx) { + match arrow_to_parquet_value(field, column, row_idx) { Ok(value) => values.push(value), Err(e) => return Some(Err(e)), } diff --git a/ext/parquet-core/src/schema.rs b/ext/parquet-core/src/schema.rs index 31d399b..0c507fe 100644 --- a/ext/parquet-core/src/schema.rs +++ b/ext/parquet-core/src/schema.rs @@ -72,6 +72,7 @@ pub enum PrimitiveType { TimestampNanos(Option>), TimeMillis, TimeMicros, + TimeNanos, // Fixed-length byte array FixedLenByteArray(i32), @@ -146,6 +147,7 @@ impl PrimitiveType { PrimitiveType::TimestampNanos(_) => "TimestampNanos", PrimitiveType::TimeMillis => "TimeMillis", PrimitiveType::TimeMicros => "TimeMicros", + PrimitiveType::TimeNanos => "TimeNanos", PrimitiveType::FixedLenByteArray(_) => "FixedLenByteArray", } } diff --git a/ext/parquet-core/src/value.rs b/ext/parquet-core/src/value.rs index 282284e..de0aef7 100644 --- a/ext/parquet-core/src/value.rs +++ b/ext/parquet-core/src/value.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use indexmap::IndexMap; use num::BigInt; use std::sync::Arc; +use uuid::Uuid; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ParquetValue { @@ -22,6 +23,7 @@ pub enum ParquetValue { Boolean(bool), String(Arc), Bytes(Bytes), + Uuid(Uuid), // Date/Time types Date32(i32), // Days since epoch @@ -40,6 +42,7 @@ pub enum ParquetValue { // Time types TimeMillis(i32), // Time of day in milliseconds since midnight TimeMicros(i64), // Time of day in microseconds since midnight + TimeNanos(i64), // Time of day in nanoseconds since midnight // Complex types List(Vec), @@ -68,6 +71,7 @@ impl std::hash::Hash for ParquetValue { ParquetValue::Boolean(b) => b.hash(state), ParquetValue::String(s) => s.hash(state), ParquetValue::Bytes(b) => b.hash(state), + ParquetValue::Uuid(u) => u.hash(state), ParquetValue::Date32(d) => d.hash(state), ParquetValue::Date64(d) => d.hash(state), ParquetValue::Decimal128(d, scale) => { @@ -96,6 +100,7 @@ impl std::hash::Hash for ParquetValue { } ParquetValue::TimeMillis(t) => t.hash(state), ParquetValue::TimeMicros(t) => t.hash(state), + ParquetValue::TimeNanos(t) => t.hash(state), ParquetValue::List(l) => l.hash(state), ParquetValue::Map(m) => m.hash(state), ParquetValue::Record(r) => { @@ -133,6 +138,7 @@ impl ParquetValue { ParquetValue::Boolean(_) => "Boolean", ParquetValue::String(_) => "String", ParquetValue::Bytes(_) => "Bytes", + ParquetValue::Uuid(_) => "Uuid", ParquetValue::Date32(_) => "Date32", ParquetValue::Date64(_) => "Date64", ParquetValue::Decimal128(_, _) => "Decimal128", @@ -143,6 +149,7 @@ impl ParquetValue { ParquetValue::TimestampNanos(_, _) => "TimestampNanos", ParquetValue::TimeMillis(_) => "TimeMillis", ParquetValue::TimeMicros(_) => "TimeMicros", + ParquetValue::TimeNanos(_) => "TimeNanos", ParquetValue::List(_) => "List", ParquetValue::Map(_) => "Map", ParquetValue::Record(_) => "Record", diff --git a/ext/parquet-core/src/writer.rs b/ext/parquet-core/src/writer.rs index 87cc1b7..7466286 100644 --- a/ext/parquet-core/src/writer.rs +++ b/ext/parquet-core/src/writer.rs @@ -235,6 +235,7 @@ where (Date64(_), DataType::Date64) => 8, (TimeMillis(_), DataType::Time32(_)) => 4, (TimeMicros(_), DataType::Time64(_)) => 8, + (TimeNanos(_), DataType::Time64(_)) => 8, (TimestampSecond(_, _), DataType::Timestamp(_, _)) => 8, (TimestampMillis(_, _), DataType::Timestamp(_, _)) => 8, (TimestampMicros(_, _), DataType::Timestamp(_, _)) => 8, @@ -364,7 +365,9 @@ where writer.write(&batch)?; // Check if we need to flush based on memory usage - if writer.in_progress_size() >= self.memory_threshold { + if writer.in_progress_size() >= self.memory_threshold + || writer.memory_size() >= self.memory_threshold + { writer.flush()?; } } else { @@ -496,6 +499,7 @@ fn validate_value_against_field(value: &ParquetValue, field: &Field, path: &str) (Date64(_), DataType::Date64) => Ok(()), (TimeMillis(_), DataType::Time32(_)) => Ok(()), (TimeMicros(_), DataType::Time64(_)) => Ok(()), + (TimeNanos(_), DataType::Time64(_)) => Ok(()), (TimestampSecond(_, _), DataType::Timestamp(_, _)) => Ok(()), (TimestampMillis(_, _), DataType::Timestamp(_, _)) => Ok(()), (TimestampMicros(_, _), DataType::Timestamp(_, _)) => Ok(()), @@ -591,10 +595,16 @@ fn schema_node_to_arrow_field(node: &SchemaNode) -> Result { name, primitive_type, nullable, - .. + format, } => { let data_type = primitive_type_to_arrow(primitive_type)?; - Ok(Field::new(name, data_type, *nullable)) + let field = Field::new(name, data_type, *nullable); + let extended_field = if format.as_deref() == Some("uuid") { + field.with_extension_type(arrow_schema::extension::Uuid) + } else { + field + }; + Ok(extended_field) } SchemaNode::List { name, @@ -671,6 +681,7 @@ fn primitive_type_to_arrow(ptype: &crate::PrimitiveType) -> Result { Date32 => DataType::Date32, TimeMillis => DataType::Time32(arrow_schema::TimeUnit::Millisecond), TimeMicros => DataType::Time64(arrow_schema::TimeUnit::Microsecond), + TimeNanos => DataType::Time64(arrow_schema::TimeUnit::Nanosecond), TimestampMillis(tz) => DataType::Timestamp( arrow_schema::TimeUnit::Millisecond, // PARQUET SPEC: ANY timezone (e.g., "+09:00", "America/New_York") means diff --git a/ext/parquet-core/tests/arrow_conversion_tests.rs b/ext/parquet-core/tests/arrow_conversion_tests.rs index 8568339..f5eed53 100644 --- a/ext/parquet-core/tests/arrow_conversion_tests.rs +++ b/ext/parquet-core/tests/arrow_conversion_tests.rs @@ -99,7 +99,7 @@ fn test_decimal256_large_values() { // Verify roundtrip for i in 0..4 { - let value = arrow_to_parquet_value(array.as_ref(), i).unwrap(); + let value = arrow_to_parquet_value(&field, array.as_ref(), i).unwrap(); match (i, value) { (0, ParquetValue::Decimal256(v, _)) => assert_eq!(v, large_positive.clone()), (1, ParquetValue::Decimal256(v, _)) => assert_eq!(v, large_negative.clone()), @@ -173,7 +173,7 @@ fn test_timestamp_with_timezone() { // Verify roundtrip preserves timezone for i in 0..3 { - let value = arrow_to_parquet_value(array.as_ref(), i).unwrap(); + let value = arrow_to_parquet_value(&field, array.as_ref(), i).unwrap(); match value { ParquetValue::TimestampMillis(_, Some(tz)) => { assert_eq!(tz.as_ref(), "America/New_York"); @@ -209,7 +209,7 @@ fn test_nested_list_of_lists() { assert_eq!(array.len(), 1); // Verify roundtrip - let value = arrow_to_parquet_value(array.as_ref(), 0).unwrap(); + let value = arrow_to_parquet_value(&outer_field, array.as_ref(), 0).unwrap(); match value { ParquetValue::List(items) => assert_eq!(items.len(), 5), _ => panic!("Expected list"), @@ -357,7 +357,7 @@ fn test_unsupported_arrow_types() { ) .unwrap(); - let result = arrow_to_parquet_value(&array, 0); + let result = arrow_to_parquet_value(&Field::new("int", DataType::Int32, false), &array, 0); assert!(result.is_err()); assert!(result .unwrap_err() diff --git a/ext/parquet-ruby-adapter/Cargo.toml b/ext/parquet-ruby-adapter/Cargo.toml index b5934b8..88bbf7f 100644 --- a/ext/parquet-ruby-adapter/Cargo.toml +++ b/ext/parquet-ruby-adapter/Cargo.toml @@ -20,3 +20,4 @@ rb-sys = { version = "0.9", features = ["stable-api-compiled-fallback"] } tempfile = "^3.15" thiserror = "2.0" indexmap = "2.2" +uuid = "*" diff --git a/ext/parquet-ruby-adapter/src/converter.rs b/ext/parquet-ruby-adapter/src/converter.rs index a0b7641..9e210b2 100644 --- a/ext/parquet-ruby-adapter/src/converter.rs +++ b/ext/parquet-ruby-adapter/src/converter.rs @@ -41,27 +41,6 @@ impl RubyValueConverter { .map(|cache| cache.stats()) } - /// Convert a Ruby value to ParquetValue with type hint - /// This is the primary conversion method that handles all Ruby types - pub fn to_parquet_with_type_hint( - &mut self, - value: Value, - type_hint: Option<&parquet_core::PrimitiveType>, - ) -> Result { - // Handle nil values - if value.is_nil() { - return Ok(ParquetValue::Null); - } - - // If we have a type hint, use it to guide conversion - if let Some(hint) = type_hint { - return self.convert_with_type_hint(value, hint); - } - - // Otherwise, infer type from Ruby value - self.infer_and_convert(value) - } - /// Convert a Ruby value to ParquetValue with schema hint /// This handles both primitive and complex types pub fn to_parquet_with_schema_hint( @@ -115,7 +94,7 @@ impl RubyValueConverter { use parquet_core::PrimitiveType::*; // Special handling for UUID format - if let (Binary, Some("uuid")) = (type_hint, format) { + if let (FixedLenByteArray(16), Some("uuid")) = (type_hint, format) { return self.convert_to_uuid_binary(value); } @@ -156,6 +135,7 @@ impl RubyValueConverter { Date64 => self.convert_to_date64(value, None), TimeMillis => self.convert_to_time_millis(value), TimeMicros => self.convert_to_time_micros(value), + TimeNanos => self.convert_to_time_nanos(value), TimestampSecond(schema_tz) => { self.convert_to_timestamp_second_with_tz(value, schema_tz.as_deref()) } @@ -484,32 +464,19 @@ impl RubyValueConverter { // Convert value to string let uuid_str: String = value - .funcall("to_s", ()) - .and_then(TryConvert::try_convert) + .to_r_string() + .map_err(|e: MagnusError| { + ParquetError::Conversion(format!("Failed to convert to UUID string: {}", e)) + })? + .to_string() .map_err(|e: MagnusError| { ParquetError::Conversion(format!("Failed to convert to UUID string: {}", e)) })?; - // Remove hyphens and validate length - let clean_uuid = uuid_str.replace('-', ""); - if clean_uuid.len() != 32 { - return Err(ParquetError::Conversion(format!( - "Invalid UUID format: expected 32 hex characters (ignoring hyphens), got {}", - clean_uuid.len() - ))); - } - - // Parse hex string to bytes - let mut bytes = Vec::with_capacity(16); - for i in 0..16 { - let hex_byte = &clean_uuid[i * 2..i * 2 + 2]; - let byte = u8::from_str_radix(hex_byte, 16).map_err(|_| { - ParquetError::Conversion(format!("Invalid hex character in UUID: {}", hex_byte)) - })?; - bytes.push(byte); - } - - Ok(ParquetValue::Bytes(bytes.into())) + let parsed = uuid::Uuid::parse_str(&uuid_str) + .map_err(|e| ParquetError::Conversion(format!("Failed to parse UUID: {}", e)))?; + let bytes = Bytes::copy_from_slice(parsed.as_bytes()); + Ok(ParquetValue::Bytes(bytes)) } fn convert_to_date32(&self, value: Value, date_format: Option<&str>) -> Result { @@ -692,6 +659,38 @@ impl RubyValueConverter { ))) } + fn convert_to_time_nanos(&self, value: Value) -> Result { + if value.is_nil() { + return Ok(ParquetValue::Null); + } + + // Convert to microseconds since midnight + let ruby = Ruby::get() + .map_err(|_| ParquetError::Conversion("Failed to get Ruby runtime".to_string()))?; + if value.is_kind_of(ruby.class_time()) { + let hour: i64 = value + .funcall("hour", ()) + .map_err(|e| ParquetError::Conversion(e.to_string()))?; + let min: i64 = value + .funcall("min", ()) + .map_err(|e| ParquetError::Conversion(e.to_string()))?; + let sec: i64 = value + .funcall("sec", ()) + .map_err(|e| ParquetError::Conversion(e.to_string()))?; + let nsec: i64 = value + .funcall("nsec", ()) + .map_err(|e| ParquetError::Conversion(e.to_string()))?; + + let nanos = (hour * 3600 + min * 60 + sec) * 1_000_000_000 + nsec; + return Ok(ParquetValue::TimeNanos(nanos)); + } + + Err(ParquetError::Conversion(format!( + "Cannot convert {} to time_micros", + value.class() + ))) + } + // Timestamp conversion methods that respect schema timezone fn convert_to_timestamp_second_with_tz( &self, @@ -1399,21 +1398,8 @@ pub fn parquet_to_ruby(value: ParquetValue) -> Result { ParquetValue::Float32(OrderedFloat(f)) => Ok((f as f64).into_value_with(&ruby)), ParquetValue::Float64(OrderedFloat(f)) => Ok(f.into_value_with(&ruby)), ParquetValue::String(s) => Ok(s.into_value_with(&ruby)), - ParquetValue::Bytes(b) => { - // Check if this is a UUID (16 bytes) - if b.len() == 16 { - // Format as UUID string - let uuid_str = format!( - "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}", - b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], - b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15] - ); - Ok(uuid_str.into_value_with(&ruby)) - } else { - // Regular bytes - convert to string - Ok(ruby.str_from_slice(&b).as_value()) - } - } + ParquetValue::Uuid(u) => Ok(u.to_string().into_value_with(&ruby)), + ParquetValue::Bytes(b) => Ok(ruby.enc_str_new(&b, ruby.ascii8bit_encoding()).as_value()), ParquetValue::Date32(days) => { // Convert days since epoch to Date object let _ = ruby.require("date"); @@ -1528,6 +1514,14 @@ pub fn parquet_to_ruby(value: ParquetValue) -> Result { .map_err(|e| ParquetError::Conversion(e.to_string()))?; apply_timezone(time, &tz) } + ParquetValue::TimeNanos(nanos) => { + let time_class = ruby.class_time(); + let secs = nanos / 1_000_000_000; + let nsec = nanos % 1_000_000_000; + time_class + .funcall("at", (secs, nsec, Symbol::new("nanosecond"))) + .map_err(|e| ParquetError::Conversion(e.to_string())) + } ParquetValue::TimestampNanos(nanos, tz) => { let time_class = ruby.class_time(); let secs = nanos / 1_000_000_000; diff --git a/ext/parquet-ruby-adapter/src/schema.rs b/ext/parquet-ruby-adapter/src/schema.rs index 79346e2..dc6c2f0 100644 --- a/ext/parquet-ruby-adapter/src/schema.rs +++ b/ext/parquet-ruby-adapter/src/schema.rs @@ -1,8 +1,9 @@ use magnus::value::ReprValue; use magnus::{Error as MagnusError, RArray, RHash, Ruby, Symbol, TryConvert, Value}; -use parquet_core::{ParquetError, PrimitiveType, Result, Schema, SchemaNode}; +use parquet_core::{ParquetError, PrimitiveType, Schema, SchemaNode}; use crate::utils::parse_string_or_symbol; +use crate::RubyAdapterError; /// Ruby schema builder that converts Ruby hash/array representations to Parquet schemas pub struct RubySchemaBuilder; @@ -13,18 +14,18 @@ impl RubySchemaBuilder { } /// Parse a Ruby schema definition (hash) into a SchemaNode - fn parse_schema_node(&self, name: String, schema_def: Value) -> Result { + fn parse_schema_node( + &self, + name: String, + schema_def: Value, + ) -> Result { // If it's a Hash, parse it as a complex type if let Ok(hash) = ::try_convert(schema_def) { return self.parse_hash_schema_node(name, hash); } // Otherwise, try to parse as a simple type symbol - if let Ok(type_sym) = ::try_convert(schema_def) { - let type_str = type_sym.name().map_err(|e: MagnusError| { - ParquetError::Conversion(format!("Failed to get symbol name: {}", e)) - })?; - + if let Ok(type_str) = schema_def.to_r_string()?.to_string() { // Check if it's a complex type with angle brackets if type_str.contains('<') { return self.parse_complex_type_string(name, type_str.to_string(), true); @@ -40,22 +41,24 @@ impl RubySchemaBuilder { }); } - Err(ParquetError::Schema(format!( + Err(RubyAdapterError::InvalidInput(format!( "Expected Hash or Symbol for schema definition, got {}", schema_def.class() ))) } /// Parse a Ruby hash schema node - fn parse_hash_schema_node(&self, name: String, hash: RHash) -> Result { + fn parse_hash_schema_node( + &self, + name: String, + hash: RHash, + ) -> Result { // Get the type field - let type_sym: Symbol = hash - .fetch::<_, Symbol>(Symbol::new("type")) + let type_sym: Value = hash + .fetch::<_, Value>(Symbol::new("type")) .map_err(|e| ParquetError::Schema(format!("Schema missing 'type' field: {}", e)))?; - let type_str = type_sym.name().map_err(|e: MagnusError| { - ParquetError::Conversion(format!("Failed to get type name: {}", e)) - })?; + let type_str = type_sym.to_r_string()?.to_string()?; // Get nullable field (default to true) let nullable = hash @@ -142,6 +145,15 @@ impl RubySchemaBuilder { // Primitive types primitive_type => { + if format.as_deref() == Some("uuid") { + return Ok(SchemaNode::Primitive { + name, + primitive_type: PrimitiveType::FixedLenByteArray(16), + nullable, + format, + }); + } + // Get precision and scale for decimal types let precision = hash .fetch::<_, Value>(Symbol::new("precision")) @@ -196,7 +208,7 @@ impl RubySchemaBuilder { name: String, type_str: String, nullable: bool, - ) -> Result { + ) -> Result { if type_str.starts_with("list<") && type_str.ends_with('>') { let inner_type = &type_str[5..type_str.len() - 1]; let item_name = format!("{}_item", name); @@ -229,7 +241,7 @@ impl RubySchemaBuilder { let inner = &type_str[4..type_str.len() - 1]; let parts: Vec<&str> = inner.split(',').map(|s| s.trim()).collect(); if parts.len() != 2 { - return Err(ParquetError::Schema(format!( + return Err(RubyAdapterError::InvalidInput(format!( "Invalid map type: {}", type_str ))); @@ -255,7 +267,7 @@ impl RubySchemaBuilder { }), }) } else { - Err(ParquetError::Schema(format!( + Err(RubyAdapterError::InvalidInput(format!( "Unknown complex type: {}", type_str ))) @@ -263,7 +275,7 @@ impl RubySchemaBuilder { } /// Parse a field definition from a Ruby hash - fn parse_field_definition(&self, field_hash: RHash) -> Result { + fn parse_field_definition(&self, field_hash: RHash) -> Result { let name: String = field_hash .fetch(Symbol::new("name")) .map_err(|e| ParquetError::Schema(format!("Field missing 'name': {}", e)))?; @@ -274,7 +286,7 @@ impl RubySchemaBuilder { self.parse_schema_node(name, field_hash.as_value()) } else { // This might be a simplified definition - look for known field patterns - Err(ParquetError::Schema(format!( + Err(RubyAdapterError::InvalidInput(format!( "Field '{}' missing 'type' definition", name ))) @@ -288,7 +300,7 @@ impl RubySchemaBuilder { precision: Option, scale: Option, timezone: Option, - ) -> Result { + ) -> Result { // Check if it's a decimal type with parentheses notation like "decimal(5,2)" if type_str.starts_with("decimal(") && type_str.ends_with(')') { let params = &type_str[8..type_str.len() - 1]; // Extract "5,2" from "decimal(5,2)" @@ -324,6 +336,14 @@ impl RubySchemaBuilder { } } + if type_str.starts_with("fixed_len_byte_array(") && type_str.ends_with(')') { + let params = &type_str[20..type_str.len() - 1]; + let len = params.parse::().map_err(|_| { + ParquetError::Schema(format!("Invalid fixed_len_byte_array length: {}", params)) + })?; + return Ok(PrimitiveType::FixedLenByteArray(len)); + } + match type_str.as_str() { "boolean" | "bool" => Ok(PrimitiveType::Boolean), "int8" => Ok(PrimitiveType::Int8), @@ -356,8 +376,9 @@ impl RubySchemaBuilder { // PARQUET SPEC: timezone presence means UTC storage (isAdjustedToUTC = true) Ok(PrimitiveType::TimestampNanos(timezone.map(Into::into))) } - "time32" | "time_millis" => Ok(PrimitiveType::TimeMillis), - "time64" | "time_micros" => Ok(PrimitiveType::TimeMicros), + "time_millis" => Ok(PrimitiveType::TimeMillis), + "time_micros" => Ok(PrimitiveType::TimeMicros), + "time_nanos" => Ok(PrimitiveType::TimeNanos), "decimal" => { // Use provided precision/scale or defaults let p = precision.unwrap_or(38); @@ -380,7 +401,7 @@ impl RubySchemaBuilder { let s = scale.unwrap_or(0); Ok(PrimitiveType::Decimal256(p, s)) } - _ => Err(ParquetError::Schema(format!( + _ => Err(RubyAdapterError::InvalidInput(format!( "Unknown primitive type: {}", type_str ))), @@ -396,7 +417,7 @@ impl Default for RubySchemaBuilder { /// Wrapper functions for Ruby FFI since SchemaBuilderTrait requires Send + Sync /// and Ruby Value is not Send/Sync -pub fn ruby_schema_to_parquet(schema_def: Value) -> Result { +pub fn ruby_schema_to_parquet(schema_def: Value) -> Result { let builder = RubySchemaBuilder::new(); // The Ruby schema should be a hash with a root struct @@ -430,7 +451,7 @@ pub fn ruby_schema_to_parquet(schema_def: Value) -> Result { let mut unique_names = std::collections::HashSet::new(); for name in &field_names { if !unique_names.insert(name) { - return Err(ParquetError::Schema(format!( + return Err(RubyAdapterError::InvalidInput(format!( "Duplicate field names in root level schema: {:?}", field_names ))); @@ -443,7 +464,7 @@ pub fn ruby_schema_to_parquet(schema_def: Value) -> Result { fields: field_nodes, } } else { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "Schema must have 'type' or 'fields' key".to_string(), )); }; @@ -452,18 +473,18 @@ pub fn ruby_schema_to_parquet(schema_def: Value) -> Result { parquet_core::SchemaBuilder::new() .with_root(root_node) .build() - .map_err(|e| ParquetError::Schema(e.to_string())) + .map_err(|e| RubyAdapterError::InvalidInput(e.to_string())) } /// Convert a Parquet schema back to Ruby representation -pub fn parquet_schema_to_ruby(schema: &Schema) -> Result { +pub fn parquet_schema_to_ruby(schema: &Schema) -> Result { let ruby = Ruby::get() .map_err(|e| ParquetError::Conversion(format!("Failed to get Ruby runtime: {}", e)))?; schema_node_to_ruby(&schema.root, &ruby) } -fn schema_node_to_ruby(node: &SchemaNode, _ruby: &Ruby) -> Result { +fn schema_node_to_ruby(node: &SchemaNode, _ruby: &Ruby) -> Result { let hash = RHash::new(); match node { @@ -552,6 +573,7 @@ fn schema_node_to_ruby(node: &SchemaNode, _ruby: &Ruby) -> Result { PrimitiveType::TimestampNanos(_) => Symbol::new("timestamp_nanos"), PrimitiveType::TimeMillis => Symbol::new("time_millis"), PrimitiveType::TimeMicros => Symbol::new("time_micros"), + PrimitiveType::TimeNanos => Symbol::new("time_nanos"), PrimitiveType::Decimal128(_, _) => Symbol::new("decimal128"), PrimitiveType::Decimal256(_, _) => Symbol::new("decimal256"), PrimitiveType::FixedLenByteArray(_) => Symbol::new("fixed_len_byte_array"), @@ -597,7 +619,7 @@ fn schema_node_to_ruby(node: &SchemaNode, _ruby: &Ruby) -> Result { /// Convert old schema format to new format /// Old: [{ "column_name" => "type" }, ...] /// New: [{ name: "column_name", type: :type }, ...] -pub fn convert_legacy_schema(ruby: &Ruby, schema: RArray) -> Result { +pub fn convert_legacy_schema(ruby: &Ruby, schema: RArray) -> Result { let new_schema = RArray::new(); for item in schema.into_iter() { @@ -630,7 +652,7 @@ pub fn convert_legacy_schema(ruby: &Ruby, schema: RArray) -> Result { ); if let Err(e) = process_result { - return Err(ParquetError::Schema(format!( + return Err(RubyAdapterError::InvalidInput(format!( "Failed to process field: {}", e ))); @@ -645,7 +667,7 @@ pub fn convert_legacy_schema(ruby: &Ruby, schema: RArray) -> Result { } /// Check if schema is in new DSL format (hash with type: :struct) -pub fn is_dsl_schema(ruby: &Ruby, schema_value: Value) -> Result { +pub fn is_dsl_schema(ruby: &Ruby, schema_value: Value) -> Result { if !schema_value.is_kind_of(ruby.class_hash()) { return Ok(false); } @@ -678,7 +700,7 @@ pub fn process_schema_value( ruby: &Ruby, schema_value: Value, data_array: Option<&RArray>, -) -> Result { +) -> Result { // Check if it's the new DSL format if is_dsl_schema(ruby, schema_value)? { // For DSL format, pass it directly to ruby_schema_to_parquet @@ -716,7 +738,7 @@ pub fn process_schema_value( convert_legacy_schema(ruby, array)? } } else { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "schema array must contain hashes".to_string(), )); } @@ -733,13 +755,13 @@ pub fn process_schema_value( ParquetError::Schema(format!("Failed to convert fields to array: {}", e)) })? } else { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "schema hash must have 'fields' key or be in DSL format with 'type' key" .to_string(), )); } } else { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "schema must be nil, an array, or a hash".to_string(), )); }; @@ -748,7 +770,7 @@ pub fn process_schema_value( if schema_array.is_empty() { if let Some(data) = data_array { if data.is_empty() { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "Cannot infer schema from empty data".to_string(), )); } @@ -767,7 +789,7 @@ pub fn process_schema_value( })?; first_array.len() } else { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "First data item must be an array".to_string(), )); }; @@ -793,7 +815,7 @@ pub fn process_schema_value( schema_array = new_schema; } else { - return Err(ParquetError::Schema( + return Err(RubyAdapterError::InvalidInput( "Schema is required when data is not provided for inference".to_string(), )); } diff --git a/ext/parquet-ruby-adapter/src/utils.rs b/ext/parquet-ruby-adapter/src/utils.rs index 079ed59..d623451 100644 --- a/ext/parquet-ruby-adapter/src/utils.rs +++ b/ext/parquet-ruby-adapter/src/utils.rs @@ -26,6 +26,7 @@ pub fn estimate_parquet_value_size(value: &ParquetValue) -> usize { ParquetValue::Float64(_) => 8, ParquetValue::String(s) => s.len() + 24, // String overhead ParquetValue::Bytes(b) => b.len() + 24, // Vec overhead + ParquetValue::Uuid(_) => 16, ParquetValue::Date32(_) => 4, ParquetValue::Date64(_) => 8, ParquetValue::Decimal128(_, _) => 16 + 1, // value + scale @@ -36,6 +37,7 @@ pub fn estimate_parquet_value_size(value: &ParquetValue) -> usize { ParquetValue::TimestampNanos(_, tz) => 8 + tz.as_ref().map_or(0, |s| s.len() + 24), ParquetValue::TimeMillis(_) => 4, ParquetValue::TimeMicros(_) => 8, + ParquetValue::TimeNanos(_) => 8, ParquetValue::List(items) => { 24 + items.iter().map(estimate_parquet_value_size).sum::() } diff --git a/test/uuid_test.rb b/test/uuid_test.rb index e088720..1af0a2c 100644 --- a/test/uuid_test.rb +++ b/test/uuid_test.rb @@ -19,7 +19,7 @@ def test_uuid_binary_conversion uuids << uuid data << [uuid, "test_#{uuid[0..7]}"] end - + # Schema with UUID format schema = { fields: [ @@ -27,14 +27,14 @@ def test_uuid_binary_conversion {name: 'name', type: :string} ] } - + # Write data Parquet.write_rows(data, schema: schema, write_to: @test_file) - + # Read data back rows = [] Parquet.each_row(@test_file) { |row| rows << row } - + # Verify UUIDs are correctly preserved assert_equal 10, rows.length rows.each_with_index do |row, i| @@ -42,24 +42,24 @@ def test_uuid_binary_conversion assert_equal "test_#{uuids[i][0..7]}", row['name'] end end - + def test_invalid_uuid_format # Test with invalid UUID data = [["not-a-valid-uuid", "test"]] - + schema = { fields: [ {name: 'id', type: :binary, format: 'uuid'}, {name: 'name', type: :string} ] } - + # Should raise an error for invalid UUID assert_raises do Parquet.write_rows(data, schema: schema, write_to: @test_file) end end - + def test_uuid_with_different_formats # Test various UUID formats uuid_formats = [ @@ -67,23 +67,23 @@ def test_uuid_with_different_formats "550e8400e29b41d4a716446655440000", # Without hyphens "550E8400-E29B-41D4-A716-446655440000", # Uppercase ] - + data = uuid_formats.map { |uuid| [uuid, "test"] } - + schema = { fields: [ - {name: 'id', type: :binary, format: 'uuid'}, + {name: 'id', type: "fixed_len_byte_array(16)", format: 'uuid'}, {name: 'name', type: :string} ] } - + # Write data Parquet.write_rows(data, schema: schema, write_to: @test_file) - + # Read data back rows = [] Parquet.each_row(@test_file) { |row| rows << row } - + # All should be normalized to standard format expected_uuid = "550e8400-e29b-41d4-a716-446655440000" assert_equal 3, rows.length @@ -91,7 +91,7 @@ def test_uuid_with_different_formats assert_equal expected_uuid, row['id'] end end - + def test_null_uuid_values # Test with null UUIDs data = [ @@ -99,42 +99,42 @@ def test_null_uuid_values ["550e8400-e29b-41d4-a716-446655440000", "valid uuid"], [nil, "another null"] ] - + schema = { fields: [ - {name: 'id', type: :binary, format: 'uuid', nullable: true}, + {name: 'id', type: "fixed_len_byte_array(16)", format: 'uuid', nullable: true}, {name: 'description', type: :string} ] } - + # Write data Parquet.write_rows(data, schema: schema, write_to: @test_file) - + # Read data back rows = [] Parquet.each_row(@test_file) { |row| rows << row } - + assert_equal 3, rows.length assert_nil rows[0]['id'] assert_equal "550e8400-e29b-41d4-a716-446655440000", rows[1]['id'] assert_nil rows[2]['id'] end - + def test_uuid_in_complex_types # Test UUID in struct id_uuid = SecureRandom.uuid tracking_uuid = SecureRandom.uuid - + # Data must be arrays, with nested hash for struct data = [ [id_uuid, { tracking_id: tracking_uuid, name: "Test Item" }] ] - + schema = { fields: [ {name: 'id', type: :binary, format: 'uuid'}, { - name: 'metadata', + name: 'metadata', type: :struct, fields: [ {name: 'tracking_id', type: :binary, format: 'uuid'}, @@ -143,20 +143,20 @@ def test_uuid_in_complex_types } ] } - + # Write data Parquet.write_rows(data, schema: schema, write_to: @test_file) - + # Read data back rows = [] Parquet.each_row(@test_file) { |row| rows << row } - + assert_equal 1, rows.length row = rows.first - + # Verify UUIDs are preserved assert_equal id_uuid, row['id'] assert_equal tracking_uuid, row['metadata']['tracking_id'] assert_equal "Test Item", row['metadata']['name'] end -end \ No newline at end of file +end