Skip to content

Convert RunEndEncoded to Parquet #8069

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
| FixedSizeList(_, _)
| Struct(_)
| Map(_, _)
| Dictionary(_, _),
| Dictionary(_, _)
| RunEndEncoded(_, _),
) => true,
// Dictionary/List conditions should be put in front of others
(Dictionary(_, from_value_type), Dictionary(_, to_value_type)) => {
Expand Down Expand Up @@ -167,6 +168,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
can_cast_types(from_key.data_type(), to_key.data_type()) && can_cast_types(from_value.data_type(), to_value.data_type()),
_ => false
},
// TODO: RunEndEncoded here?
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled in #7713

// cast one decimal type to another decimal type
(Decimal128(_, _), Decimal128(_, _)) => true,
(Decimal256(_, _), Decimal256(_, _)) => true,
Expand Down Expand Up @@ -781,6 +783,7 @@ pub fn cast_with_options(
"Casting from type {from_type:?} to dictionary type {to_type:?} not supported",
))),
},
// TODO: RunEndEncoded here?
(List(_), List(to)) => cast_list_values::<i32>(array, to, cast_options),
(LargeList(_), LargeList(to)) => cast_list_values::<i64>(array, to, cast_options),
(List(_), LargeList(list_to)) => cast_list::<i32, i64>(array, list_to, cast_options),
Expand Down
2 changes: 1 addition & 1 deletion arrow-schema/src/datatype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ pub enum DataType {
/// that contain many repeated values using less memory, but with
/// a higher CPU overhead for some operations.
///
/// This type mostly used to represent low cardinality string
/// This type is mostly used to represent low cardinality string
/// arrays or a limited set of primitive types as integers.
Dictionary(Box<DataType>, Box<DataType>),
/// Exact 32-bit width decimal value with precision and scale
Expand Down
38 changes: 37 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow_array::{
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
LargeBinaryArray, LargeStringArray, RunArray, StringArray, StringViewArray,
};
use arrow_schema::DataType;

Expand Down Expand Up @@ -59,6 +59,28 @@ macro_rules! downcast_dict_op {
};
}

macro_rules! downcast_ree_impl {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mimicked downcast_dict

($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
$op($array
.as_any()
.downcast_ref::<RunArray<arrow_array::types::$key>>()
.unwrap()
.downcast::<$val>()
.unwrap()$(, $arg)*)
}};
}

macro_rules! downcast_ree_op {
($run_end_field:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
match $run_end_field.data_type() {
DataType::Int16 => downcast_ree_impl!($array, Int16Type, $val, $op$(, $arg)*),
DataType::Int32 => downcast_ree_impl!($array, Int32Type, $val, $op$(, $arg)*),
DataType::Int64 => downcast_ree_impl!($array, Int64Type, $val, $op$(, $arg)*),
_ => unreachable!(),
}
};
}

macro_rules! downcast_op {
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
match $data_type {
Expand Down Expand Up @@ -90,6 +112,20 @@ macro_rules! downcast_op {
}
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
},
DataType::RunEndEncoded(run_end, value) => match value.data_type() {
DataType::Utf8 => downcast_ree_op!(run_end, StringArray, $array, $op$(, $arg)*),
DataType::LargeUtf8 => {
downcast_ree_op!(run_end, LargeStringArray, $array, $op$(, $arg)*)
}
DataType::Binary => downcast_ree_op!(run_end, BinaryArray, $array, $op$(, $arg)*),
DataType::LargeBinary => {
downcast_ree_op!(run_end, LargeBinaryArray, $array, $op$(, $arg)*)
}
DataType::FixedSizeBinary(_) => {
downcast_ree_op!(run_end, FixedSizeBinaryArray, $array, $op$(, $arg)*)
}
d => unreachable!("cannot downcast {} run end encoded value to byte array", d),
},
d => unreachable!("cannot downcast {} to byte array", d),
}
};
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ impl LevelInfoBuilder {
_ => unreachable!(),
})
}
DataType::RunEndEncoded(_, v) if is_leaf(v.data_type()) => {
let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
Ok(Self::Primitive(levels))
}
Comment on lines +225 to +228
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly same as Dictionary above:

DataType::Dictionary(_, v) if is_leaf(v.as_ref()) => {
let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
Ok(Self::Primitive(levels))
}

d => Err(nyi_err!("Datatype {} is not yet supported", d)),
}
}
Expand Down
83 changes: 70 additions & 13 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1033,15 +1033,15 @@ impl ArrowColumnWriterFactory {

match data_type {
_ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
out.push(col(leaves.next().unwrap())?)
}
ArrowDataType::LargeBinary
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::BinaryView
| ArrowDataType::Utf8View => {
out.push(bytes(leaves.next().unwrap())?)
}
| ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
ArrowDataType::List(f)
| ArrowDataType::LargeList(f)
| ArrowDataType::FixedSizeList(f, _) => {
Expand All @@ -1058,21 +1058,29 @@ impl ArrowColumnWriterFactory {
self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
}
_ => unreachable!("invalid map type"),
}
},
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
out.push(bytes(leaves.next().unwrap())?)
}
ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Binary
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
out.push(bytes(leaves.next().unwrap())?)
}
ArrowDataType::FixedSizeBinary(_) => {
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
_ => out.push(col(leaves.next().unwrap())?),
},
ArrowDataType::RunEndEncoded(_run_ends, value_type) => match value_type.data_type() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've basically copied what Dictionary does. Not sure if correct!

ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Binary
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
out.push(bytes(leaves.next().unwrap())?)
}
_ => {
out.push(col(leaves.next().unwrap())?)
}
}
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
_ => out.push(col(leaves.next().unwrap())?),
},
_ => return Err(ParquetError::NYI(
format!(
"Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
Expand Down Expand Up @@ -1166,6 +1174,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
write_primitive(typed, array.values(), levels)
}
},
ArrowDataType::RunEndEncoded(_run_ends, _value_type) => todo!(),
_ => {
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
let array = array.as_primitive::<Int32Type>();
Expand Down Expand Up @@ -1248,6 +1257,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
write_primitive(typed, array.values(), levels)
}
},
ArrowDataType::RunEndEncoded(_run_ends, _values) => todo!(),
_ => {
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
let array = array.as_primitive::<Int64Type>();
Expand Down Expand Up @@ -1324,6 +1334,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
let array = column.as_primitive::<Float16Type>();
get_float_16_array_slice(array, indices)
}
ArrowDataType::RunEndEncoded(_run_ends, _values) => todo!(),
_ => {
return Err(ParquetError::NYI(
"Attempting to write an Arrow type that is not yet implemented".to_string(),
Expand Down Expand Up @@ -4293,4 +4304,50 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}

#[test]
fn arrow_writer_run_end_encoded() {
// Create a run array of strings
let mut builder = StringRunBuilder::<Int16Type>::new();
builder.extend(
vec![Some("alpha"); 1000]
.into_iter()
.chain(vec![Some("beta"); 1000]),
);
let run_array: RunArray<Int16Type> = builder.finish();
println!("run_array type: {:?}", run_array.data_type());
let schema = Arc::new(Schema::new(vec![Field::new(
"ree",
run_array.data_type().clone(),
run_array.is_nullable(),
)]));

// Write to parquet
let mut parquet_bytes: Vec<u8> = Vec::new();
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap();
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();

// Schema of output is plain, not dictionary or REE encoded!!
let expected_schema = Arc::new(Schema::new(vec![Field::new(
"ree",
arrow_schema::DataType::Utf8,
false,
)]));

// Read from parquet
let bytes = Bytes::from(parquet_bytes);
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
assert_eq!(reader.schema(), &expected_schema);
let batches: Vec<_> = reader
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();
assert_eq!(batches.len(), 2);
// Count rows in total
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
assert_eq!(total_rows, 2000);
}
}
35 changes: 29 additions & 6 deletions parquet/src/arrow/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ pub fn parquet_to_arrow_field_levels(
match complex::convert_schema(schema, mask, hint)? {
Some(field) => match &field.arrow_type {
DataType::Struct(fields) => Ok(FieldLevels {
fields: fields.clone(),
fields: fields.to_owned(),
levels: Some(field),
}),
_ => unreachable!(),
Expand Down Expand Up @@ -303,7 +303,7 @@ impl<'a> ArrowSchemaConverter<'a> {
///
/// Setting this option to `true` will result in Parquet files that can be
/// read by more readers, but may lose precision for Arrow types such as
/// [`DataType::Date64`] which have no direct [corresponding Parquet type].
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
///
/// By default, this converter does not coerce to native Parquet types. Enabling type
/// coercion allows for meaningful representations that do not require
Expand Down Expand Up @@ -771,12 +771,17 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
DataType::Dictionary(_, ref value) => {
// Dictionary encoding not handled at the schema level
let dict_field = field.clone().with_data_type(value.as_ref().clone());
let dict_field = field.to_owned().with_data_type(value.as_ref().clone());
arrow_to_parquet_type(&dict_field, coerce_types)
}
DataType::RunEndEncoded(_run_end_type, value_type) => {
// We want to write REE data as dictionary encoded data,
// which is not handled at the schema level.
let dict_field = field
.to_owned()
.with_data_type(value_type.data_type().to_owned());
arrow_to_parquet_type(&dict_field, coerce_types)
}
DataType::RunEndEncoded(_, _) => Err(arrow_err!(
"Converting RunEndEncodedType to parquet not supported",
)),
}
}

Expand Down Expand Up @@ -2272,4 +2277,22 @@ mod tests {

Ok(())
}

#[test]
fn test_run_end_encoded_conversion() {
use crate::basic::Type;
let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
let values_field = Arc::new(Field::new("values", DataType::Boolean, true));
let run_end_encoded_field = Field::new(
"run_end_encoded_16",
DataType::RunEndEncoded(run_ends_field, values_field),
false,
);

let result = arrow_to_parquet_type(&run_end_encoded_field, false).unwrap();
// Should convert to the underlying value type (Boolean in this case)
assert_eq!(result.get_physical_type(), Type::BOOLEAN);
assert_eq!(result.get_basic_info().repetition(), Repetition::REQUIRED); // field is not nullable
assert_eq!(result.name(), "run_end_encoded_16");
}
}