Skip to content

[thrift-remodel] Complete decoding of FileMetaData and RowGroupMetaData #8111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Aug 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions parquet/benches/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use parquet::file::metadata::ParquetMetaDataReader;
use rand::Rng;
use thrift::protocol::TCompactOutputProtocol;

Expand Down Expand Up @@ -198,19 +199,43 @@ fn criterion_benchmark(c: &mut Criterion) {
});

let meta_data = get_footer_bytes(data.clone());
c.bench_function("decode file metadata", |b| {
c.bench_function("decode parquet metadata", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&meta_data).unwrap();
})
});

c.bench_function("decode thrift file metadata", |b| {
b.iter(|| {
parquet::thrift::bench_file_metadata(&meta_data);
})
});

let buf = black_box(encoded_meta()).into();
c.bench_function("decode file metadata (wide)", |b| {
c.bench_function("decode parquet metadata new", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_file_metadata(&meta_data).unwrap();
})
});

let buf: Bytes = black_box(encoded_meta()).into();
c.bench_function("decode parquet metadata (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_metadata(&buf).unwrap();
})
});

c.bench_function("decode thrift file metadata (wide)", |b| {
b.iter(|| {
parquet::thrift::bench_file_metadata(&buf);
})
});

c.bench_function("decode parquet metadata new (wide)", |b| {
b.iter(|| {
ParquetMetaDataReader::decode_file_metadata(&buf).unwrap();
})
});

// rewrite file with page statistics. then read page headers.
#[cfg(feature = "arrow")]
let (file_bytes, metadata) = rewrite_file(data.clone());
Expand Down
77 changes: 13 additions & 64 deletions parquet/src/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{fmt, str};

pub use crate::compression::{BrotliLevel, GzipLevel, ZstdLevel};
use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol};
use crate::{thrift_enum, thrift_private_struct, thrift_union_all_empty};
use crate::{thrift_enum, thrift_struct, thrift_union_all_empty};

use crate::errors::{ParquetError, Result};

Expand Down Expand Up @@ -210,14 +210,14 @@ union TimeUnit {

// private structs for decoding logical type

thrift_private_struct!(
thrift_struct!(
struct DecimalType {
1: required i32 scale
2: required i32 precision
}
);

thrift_private_struct!(
thrift_struct!(
struct TimestampType {
1: required bool is_adjusted_to_u_t_c
2: required TimeUnit unit
Expand All @@ -227,84 +227,33 @@ struct TimestampType {
// they are identical
use TimestampType as TimeType;

thrift_private_struct!(
thrift_struct!(
struct IntType {
1: required i8 bit_width
2: required bool is_signed
}
);

thrift_private_struct!(
thrift_struct!(
struct VariantType {
// The version of the variant specification that the variant was
// written with.
1: optional i8 specification_version
}
);

// TODO need macro for structs that need lifetime annotation
thrift_struct!(
struct GeometryType<'a> {
crs: Option<&'a str>,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for GeometryType<'a> {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let mut crs: Option<&str> = None;
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
1 => {
let val = prot.read_string()?;
crs = Some(val);
}
_ => {
prot.skip(field_ident.field_type)?;
}
};
}
Ok(Self { crs })
}
1: optional string<'a> crs;
}
);

thrift_struct!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty sweet way of generating structs BTW

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jhorstmann! I agree! 😄

struct GeographyType<'a> {
crs: Option<&'a str>,
algorithm: Option<EdgeInterpolationAlgorithm>,
}

impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for GeographyType<'a> {
type Error = ParquetError;
fn try_from(prot: &mut ThriftCompactInputProtocol<'a>) -> Result<Self> {
let mut crs: Option<&str> = None;
let mut algorithm: Option<EdgeInterpolationAlgorithm> = None;
prot.read_struct_begin()?;
loop {
let field_ident = prot.read_field_begin()?;
if field_ident.field_type == FieldType::Stop {
break;
}
match field_ident.id {
1 => {
let val = prot.read_string()?;
crs = Some(val);
}
2 => {
let val = EdgeInterpolationAlgorithm::try_from(&mut *prot)?;
algorithm = Some(val);
}

_ => {
prot.skip(field_ident.field_type)?;
}
};
}
Ok(Self { crs, algorithm })
}
1: optional string<'a> crs;
2: optional EdgeInterpolationAlgorithm algorithm;
}
);

/// Logical types used by version 2.4.0+ of the Parquet format.
///
Expand Down Expand Up @@ -971,7 +920,7 @@ impl<'a> TryFrom<&mut ThriftCompactInputProtocol<'a>> for ColumnOrder {
}
let ret = match field_ident.id {
1 => {
// TODO: the sort order needs to be set correctly after parsing.
// NOTE: the sort order needs to be set correctly after parsing.
prot.skip_empty_struct()?;
Self::TYPE_DEFINED_ORDER(SortOrder::SIGNED)
}
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/encryption/decrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ impl CryptoContext {
column_ordinal: usize,
) -> Result<Self> {
let (data_decryptor, metadata_decryptor) = match column_crypto_metadata {
ColumnCryptoMetaData::EncryptionWithFooterKey => {
ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY => {
// TODO: In GCM-CTR mode will this need to be a non-GCM decryptor?
let data_decryptor = file_decryptor.get_footer_decryptor()?;
let metadata_decryptor = file_decryptor.get_footer_decryptor()?;
(data_decryptor, metadata_decryptor)
}
ColumnCryptoMetaData::EncryptionWithColumnKey(column_key_encryption) => {
ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(column_key_encryption) => {
let key_metadata = &column_key_encryption.key_metadata;
let full_column_name;
let column_name = if column_key_encryption.path_in_schema.len() == 1 {
Expand Down
4 changes: 2 additions & 2 deletions parquet/src/encryption/encrypt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,14 @@ pub(crate) fn get_column_crypto_metadata(
) -> Option<ColumnCryptoMetaData> {
if properties.column_keys.is_empty() {
// Uniform encryption
Some(ColumnCryptoMetaData::EncryptionWithFooterKey)
Some(ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY)
} else {
properties
.column_keys
.get(&column.path().string())
.map(|encryption_key| {
// Column is encrypted with a column specific key
ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(EncryptionWithColumnKey {
path_in_schema: column.path().parts().to_vec(),
key_metadata: encryption_key.key_metadata.clone(),
})
Expand Down
46 changes: 26 additions & 20 deletions parquet/src/file/column_crypto_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,47 @@

//! Column chunk encryption metadata

use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::format::{
ColumnCryptoMetaData as TColumnCryptoMetaData,
EncryptionWithColumnKey as TEncryptionWithColumnKey,
EncryptionWithFooterKey as TEncryptionWithFooterKey,
};
use crate::parquet_thrift::{FieldType, ThriftCompactInputProtocol};
use crate::{thrift_struct, thrift_union};

/// ColumnCryptoMetadata for a column chunk
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ColumnCryptoMetaData {
/// The column is encrypted with the footer key
EncryptionWithFooterKey,
/// The column is encrypted with a column-specific key
EncryptionWithColumnKey(EncryptionWithColumnKey),
}
// define this and ColumnCryptoMetadata here so they're only defined when
// the encryption feature is enabled

thrift_struct!(
/// Encryption metadata for a column chunk encrypted with a column-specific key
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct EncryptionWithColumnKey {
/// Path to the column in the Parquet schema
pub path_in_schema: Vec<String>,
/// Metadata required to retrieve the column encryption key
pub key_metadata: Option<Vec<u8>>,
/// Path to the column in the Parquet schema
1: required list<string> path_in_schema

/// Path to the column in the Parquet schema
2: optional binary key_metadata
}
);

thrift_union!(
/// ColumnCryptoMetadata for a column chunk
union ColumnCryptoMetaData {
1: ENCRYPTION_WITH_FOOTER_KEY
2: (EncryptionWithColumnKey) ENCRYPTION_WITH_COLUMN_KEY
}
);

/// Converts Thrift definition into `ColumnCryptoMetadata`.
pub fn try_from_thrift(
thrift_column_crypto_metadata: &TColumnCryptoMetaData,
) -> Result<ColumnCryptoMetaData> {
let crypto_metadata = match thrift_column_crypto_metadata {
TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(_) => {
ColumnCryptoMetaData::EncryptionWithFooterKey
ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY
}
TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(encryption_with_column_key) => {
ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(EncryptionWithColumnKey {
path_in_schema: encryption_with_column_key.path_in_schema.clone(),
key_metadata: encryption_with_column_key.key_metadata.clone(),
})
Expand All @@ -63,10 +69,10 @@ pub fn try_from_thrift(
/// Converts `ColumnCryptoMetadata` into Thrift definition.
pub fn to_thrift(column_crypto_metadata: &ColumnCryptoMetaData) -> TColumnCryptoMetaData {
match column_crypto_metadata {
ColumnCryptoMetaData::EncryptionWithFooterKey => {
ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY => {
TColumnCryptoMetaData::ENCRYPTIONWITHFOOTERKEY(TEncryptionWithFooterKey {})
}
ColumnCryptoMetaData::EncryptionWithColumnKey(encryption_with_column_key) => {
ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(encryption_with_column_key) => {
TColumnCryptoMetaData::ENCRYPTIONWITHCOLUMNKEY(TEncryptionWithColumnKey {
path_in_schema: encryption_with_column_key.path_in_schema.clone(),
key_metadata: encryption_with_column_key.key_metadata.clone(),
Expand All @@ -81,14 +87,14 @@ mod tests {

#[test]
fn test_encryption_with_footer_key_from_thrift() {
let metadata = ColumnCryptoMetaData::EncryptionWithFooterKey;
let metadata = ColumnCryptoMetaData::ENCRYPTION_WITH_FOOTER_KEY;

assert_eq!(try_from_thrift(&to_thrift(&metadata)).unwrap(), metadata);
}

#[test]
fn test_encryption_with_column_key_from_thrift() {
let metadata = ColumnCryptoMetaData::EncryptionWithColumnKey(EncryptionWithColumnKey {
let metadata = ColumnCryptoMetaData::ENCRYPTION_WITH_COLUMN_KEY(EncryptionWithColumnKey {
path_in_schema: vec!["abc".to_owned(), "def".to_owned()],
key_metadata: Some(vec![0, 1, 2, 3, 4, 5]),
});
Expand Down
Loading
Loading