From 2aaa22af86d925b57cacc1960f0edec6adc446a7 Mon Sep 17 00:00:00 2001 From: Aarushi Gupta Date: Tue, 23 Jun 2026 15:46:50 +0100 Subject: [PATCH 1/5] wip --- .../src/writer/file_writer/parquet_writer.rs | 88 +++++++++++++++++-- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..1743102fe3 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -36,6 +36,7 @@ use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; +use crate::encryption::EncryptedOutputFile; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, @@ -46,6 +47,47 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// Output destination for a [`ParquetWriter`]. +/// +/// Wraps either a plain or an AES-GCM-encrypted output +enum OutputTarget { + Plain(OutputFile), + Encrypted(EncryptedOutputFile), +} + +impl OutputTarget { + async fn writer(&self) -> Result> { + match self { + Self::Plain(o) => o.writer().await, + Self::Encrypted(e) => e.writer().await, + } + } + + async fn delete(&self) -> Result<()> { + match self { + Self::Plain(o) => o.delete().await, + Self::Encrypted(e) => e.delete().await, + } + } + + fn location(&self) -> &str { + match self { + Self::Plain(o) => o.location(), + Self::Encrypted(e) => e.location(), + } + } + + /// Returns the encoded `StandardKeyMetadata` bytes for the encrypted + /// variant — these are what gets stored on `DataFile.key_metadata` so + /// readers can reconstruct the key. Plain variant returns `None`. + fn encoded_key_metadata(&self) -> Result>> { + match self { + Self::Plain(_) => Ok(None), + Self::Encrypted(e) => Ok(Some(e.key_metadata().encode()?.into_vec())), + } + } +} + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -79,14 +121,35 @@ impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; async fn build(&self, output_file: OutputFile) -> Result { - Ok(ParquetWriter { + Ok(self.build_with_target(OutputTarget::Plain(output_file))) + } +} + +impl ParquetWriterBuilder { + /// Like [`Self::build`], but writes an AES-GCM-encrypted parquet file. + /// + /// The returned writer wraps `encrypted_output` so each parquet block is + /// encrypted as it streams to disk. On close, the resulting + /// [`DataFile::key_metadata`] is populated with the encoded + /// `StandardKeyMetadata` from the wrapper so readers can decrypt the file. + /// + /// [`DataFile::key_metadata`]: crate::spec::DataFile::key_metadata + pub async fn build_from_encrypted( + &self, + encrypted_output: EncryptedOutputFile, + ) -> Result { + Ok(self.build_with_target(OutputTarget::Encrypted(encrypted_output))) + } + + fn build_with_target(&self, output_target: OutputTarget) -> ParquetWriter { + ParquetWriter { schema: self.schema.clone(), inner_writer: None, writer_properties: self.props.clone(), current_row_num: 0, - output_file, + output_target, nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), - }) + } } } @@ -211,7 +274,7 @@ impl SchemaVisitor for IndexByParquetPathName { /// `ParquetWriter`` is used to write arrow data into parquet file on storage. pub struct ParquetWriter { schema: SchemaRef, - output_file: OutputFile, + output_target: OutputTarget, inner_writer: Option>, writer_properties: WriterProperties, current_row_num: usize, @@ -337,6 +400,9 @@ impl ParquetWriter { file_path, // TODO: Implement nan_value_counts here HashMap::new(), + // This helper consumes already-written parquet files; encryption + // metadata isn't surfaced here, so always plain for now. + None, )?; builder.partition_spec_id(table_metadata.default_partition_spec_id()); let data_file = builder.build().unwrap(); @@ -353,6 +419,7 @@ impl ParquetWriter { written_size: usize, file_path: String, nan_value_counts: HashMap, + key_metadata: Option>, ) -> Result { let index_by_parquet_path = { let mut visitor = IndexByParquetPathName::new(); @@ -412,6 +479,7 @@ impl ParquetWriter { // - We can ignore implementing distinct_counts due to this: https://lists.apache.org/thread/j52tsojv0x4bopxyzsp7m7bqt23n5fnd .lower_bounds(lower_bounds) .upper_bounds(upper_bounds) + .key_metadata(key_metadata) .split_offsets(Some( metadata .row_groups() @@ -490,7 +558,7 @@ impl FileWriter for ParquetWriter { writer } else { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let inner_writer = self.output_file.writer().await?; + let inner_writer = self.output_target.writer().await?; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( async_writer, @@ -529,7 +597,7 @@ impl FileWriter for ParquetWriter { let written_size = writer.bytes_written(); if self.current_row_num == 0 { - self.output_file.delete().await.map_err(|err| { + self.output_target.delete().await.map_err(|err| { Error::new( ErrorKind::Unexpected, "Failed to delete empty parquet file.", @@ -540,12 +608,14 @@ impl FileWriter for ParquetWriter { } else { let parquet_metadata = Arc::new(metadata); + let key_metadata = self.output_target.encoded_key_metadata()?; Ok(vec![Self::parquet_to_data_file_builder( self.schema, parquet_metadata, written_size, - self.output_file.location().to_string(), + self.output_target.location().to_string(), self.nan_value_count_visitor.nan_value_counts, + key_metadata, )?]) } } @@ -553,7 +623,7 @@ impl FileWriter for ParquetWriter { impl CurrentFileStatus for ParquetWriter { fn current_file_path(&self) -> String { - self.output_file.location().to_string() + self.output_target.location().to_string() } fn current_row_num(&self) -> usize { @@ -627,6 +697,8 @@ mod tests { use super::*; use crate::arrow::schema_to_arrow_schema; + use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient}; + use crate::encryption::{EncryptedInputFile, EncryptionManager}; use crate::io::FileIO; use crate::spec::decimal_utils::{decimal_mantissa, decimal_new, decimal_scale}; use crate::spec::{PrimitiveLiteral, Struct, *}; From b3ea8337620fdd945c37c8ccd59b268b6a563d29 Mon Sep 17 00:00:00 2001 From: Aarushi Gupta Date: Wed, 24 Jun 2026 00:08:11 +0100 Subject: [PATCH 2/5] feat: support encrypted parquet data files --- crates/iceberg/src/writer/file_writer/mod.rs | 19 ++- .../src/writer/file_writer/parquet_writer.rs | 119 +++++++++++++++--- 2 files changed, 121 insertions(+), 17 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 101919f5b3..ff94e00a66 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -21,8 +21,9 @@ use arrow_array::RecordBatch; use futures::Future; use super::CurrentFileStatus; -use crate::Result; +use crate::encryption::EncryptedOutputFile; use crate::spec::DataFileBuilder; +use crate::{Error, ErrorKind, Result}; mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; @@ -41,6 +42,22 @@ pub trait FileWriterBuilder: Clone + Send + Sync + 'static { type R: FileWriter; /// Build file writer. fn build(&self, output_file: OutputFile) -> impl Future> + Send; + + /// Like [`Self::build`], but writes an AES-GCM-encrypted file. The default + /// implementation rejects the call — formats that support encryption + /// override this to wire the encrypted output through their writer. + fn build_encrypted( + &self, + encrypted_output: EncryptedOutputFile, + ) -> impl Future> + Send { + async move { + let _ = encrypted_output; + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Encryption is not supported by this file writer builder", + )) + } + } } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 1743102fe3..b9f91221fd 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -47,9 +47,9 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; -/// Output destination for a [`ParquetWriter`]. -/// -/// Wraps either a plain or an AES-GCM-encrypted output +/// Output destination for a [`ParquetWriter`]: either a plain [`OutputFile`] +/// or an [`EncryptedOutputFile`] that transparently AES-GCM-encrypts each block +/// as it streams to storage. enum OutputTarget { Plain(OutputFile), Encrypted(EncryptedOutputFile), @@ -123,24 +123,20 @@ impl FileWriterBuilder for ParquetWriterBuilder { async fn build(&self, output_file: OutputFile) -> Result { Ok(self.build_with_target(OutputTarget::Plain(output_file))) } -} -impl ParquetWriterBuilder { - /// Like [`Self::build`], but writes an AES-GCM-encrypted parquet file. + /// Builds a `ParquetWriter` that writes an AES-GCM-encrypted parquet file. /// - /// The returned writer wraps `encrypted_output` so each parquet block is - /// encrypted as it streams to disk. On close, the resulting - /// [`DataFile::key_metadata`] is populated with the encoded + /// On close, the resulting [`DataFile::key_metadata`] is populated with the encoded /// `StandardKeyMetadata` from the wrapper so readers can decrypt the file. - /// - /// [`DataFile::key_metadata`]: crate::spec::DataFile::key_metadata - pub async fn build_from_encrypted( + async fn build_encrypted( &self, encrypted_output: EncryptedOutputFile, - ) -> Result { + ) -> Result { Ok(self.build_with_target(OutputTarget::Encrypted(encrypted_output))) } +} +impl ParquetWriterBuilder { fn build_with_target(&self, output_target: OutputTarget) -> ParquetWriter { ParquetWriter { schema: self.schema.clone(), @@ -370,7 +366,7 @@ impl MinMaxColAggregator { } impl ParquetWriter { - /// Converts parquet files to data files + /// Converts already-written parquet files into [`DataFile`]s. #[allow(dead_code)] pub(crate) async fn parquet_files_to_data_files( file_io: &FileIO, @@ -400,8 +396,6 @@ impl ParquetWriter { file_path, // TODO: Implement nan_value_counts here HashMap::new(), - // This helper consumes already-written parquet files; encryption - // metadata isn't surfaced here, so always plain for now. None, )?; builder.partition_spec_id(table_metadata.default_partition_spec_id()); @@ -933,6 +927,99 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_writer_encrypted_round_trip() -> Result<()> { + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + + use crate::encryption::StandardKeyMetadata; + + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = DefaultFileNameGenerator::new( + "encrypted".to_string(), + None, + DataFileFormat::Parquet, + ); + + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let manager = EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(); + + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( + "col", + DataType::Int64, + true, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )]))])); + let col = Arc::new(Int64Array::from_iter_values(0..256)) as ArrayRef; + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap(); + + let plain_output = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let encrypted_output = manager.encrypt(plain_output); + let expected_key_metadata = encrypted_output.key_metadata().clone(); + let file_path = encrypted_output.location().to_string(); + + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(arrow_schema.as_ref().try_into().unwrap()), + ) + .build_encrypted(encrypted_output) + .await?; + pw.write(&to_write).await?; + let mut res = pw.close().await?; + assert_eq!(res.len(), 1); + + let data_file = res + .pop() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + // DataFile must carry the encoded key metadata so readers can decrypt. + let encoded_km = data_file + .key_metadata() + .expect("encrypted data file must carry key_metadata"); + let decoded_km = StandardKeyMetadata::decode(encoded_km)?; + assert_eq!(decoded_km, expected_key_metadata); + + // Sanity: bytes on disk must NOT contain the plaintext parquet magic, since + // the file is wrapped in AGS1 framing. + let raw_bytes = file_io.new_input(&file_path)?.read().await?; + assert_ne!( + &raw_bytes[..4], + b"PAR1", + "encrypted file should not start with plaintext parquet magic" + ); + + // Reading through EncryptedInputFile yields the original parquet stream. + let encrypted_input = + EncryptedInputFile::new(file_io.new_input(&file_path)?, decoded_km); + let plaintext = encrypted_input.read().await?; + let reader = ParquetRecordBatchReaderBuilder::try_new(plaintext) + .unwrap() + .build() + .unwrap(); + let batches: Vec = reader.map(|b| b.unwrap()).collect(); + let round_tripped = concat_batches(&arrow_schema, &batches).unwrap(); + assert_eq!(to_write, round_tripped); + + Ok(()) + } + #[tokio::test] async fn test_parquet_writer_with_complex_schema() -> Result<()> { let temp_dir = TempDir::new().unwrap(); From 56fdb5c6b004e416aecf2dce6e15e0f9406b53d6 Mon Sep 17 00:00:00 2001 From: Aarushi Gupta Date: Wed, 24 Jun 2026 00:43:55 +0100 Subject: [PATCH 3/5] test: simplify encrypted parquet round-trip test --- crates/iceberg/src/writer/file_writer/mod.rs | 6 +- .../src/writer/file_writer/parquet_writer.rs | 88 +++++++------------ 2 files changed, 32 insertions(+), 62 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index ff94e00a66..0f46e635cd 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -43,9 +43,7 @@ pub trait FileWriterBuilder: Clone + Send + Sync + 'static { /// Build file writer. fn build(&self, output_file: OutputFile) -> impl Future> + Send; - /// Like [`Self::build`], but writes an AES-GCM-encrypted file. The default - /// implementation rejects the call — formats that support encryption - /// override this to wire the encrypted output through their writer. + /// Formats that support encryption should override this to wire the encrypted output through their writer. fn build_encrypted( &self, encrypted_output: EncryptedOutputFile, @@ -54,7 +52,7 @@ pub trait FileWriterBuilder: Clone + Send + Sync + 'static { let _ = encrypted_output; Err(Error::new( ErrorKind::FeatureUnsupported, - "Encryption is not supported by this file writer builder", + "Encryption is not supported by this file writer", )) } } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index b9f91221fd..8afcd308de 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -47,9 +47,6 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; -/// Output destination for a [`ParquetWriter`]: either a plain [`OutputFile`] -/// or an [`EncryptedOutputFile`] that transparently AES-GCM-encrypts each block -/// as it streams to storage. enum OutputTarget { Plain(OutputFile), Encrypted(EncryptedOutputFile), @@ -78,8 +75,7 @@ impl OutputTarget { } /// Returns the encoded `StandardKeyMetadata` bytes for the encrypted - /// variant — these are what gets stored on `DataFile.key_metadata` so - /// readers can reconstruct the key. Plain variant returns `None`. + /// variant fn encoded_key_metadata(&self) -> Result>> { match self { Self::Plain(_) => Ok(None), @@ -933,16 +929,8 @@ mod tests { use crate::encryption::StandardKeyMetadata; - let temp_dir = TempDir::new().unwrap(); - let file_io = FileIO::new_with_fs(); - let location_gen = DefaultLocationGenerator::with_data_location( - temp_dir.path().to_str().unwrap().to_string(), - ); - let file_name_gen = DefaultFileNameGenerator::new( - "encrypted".to_string(), - None, - DataFileFormat::Parquet, - ); + let file_io = FileIO::new_with_memory(); + let path = "memory:///encrypted_parquet.parquet"; let kms = MemoryKeyManagementClient::new(); kms.add_master_key("master-1").unwrap(); @@ -951,36 +939,31 @@ mod tests { .table_key_id("master-1") .build(); - let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![Field::new( - "col", - DataType::Int64, - true, + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![ + Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + ])); + let batch = RecordBatch::try_new( + arrow_schema.clone(), + vec![Arc::new(Int64Array::from_iter_values(0..16)) as ArrayRef], ) - .with_metadata(HashMap::from([( - PARQUET_FIELD_ID_META_KEY.to_string(), - "0".to_string(), - )]))])); - let col = Arc::new(Int64Array::from_iter_values(0..256)) as ArrayRef; - let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap(); - - let plain_output = file_io.new_output( - location_gen.generate_location(None, &file_name_gen.generate_file_name()), - )?; - let encrypted_output = manager.encrypt(plain_output); - let expected_key_metadata = encrypted_output.key_metadata().clone(); - let file_path = encrypted_output.location().to_string(); + .unwrap(); + + let encrypted_out = manager.encrypt(file_io.new_output(path)?); + let expected_km = encrypted_out.key_metadata().clone(); let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(arrow_schema.as_ref().try_into().unwrap()), ) - .build_encrypted(encrypted_output) + .build_encrypted(encrypted_out) .await?; - pw.write(&to_write).await?; - let mut res = pw.close().await?; - assert_eq!(res.len(), 1); + pw.write(&batch).await?; + let mut builders = pw.close().await?; - let data_file = res + let data_file = builders .pop() .unwrap() .content(DataContentType::Data) @@ -989,33 +972,22 @@ mod tests { .build() .unwrap(); - // DataFile must carry the encoded key metadata so readers can decrypt. - let encoded_km = data_file - .key_metadata() - .expect("encrypted data file must carry key_metadata"); - let decoded_km = StandardKeyMetadata::decode(encoded_km)?; - assert_eq!(decoded_km, expected_key_metadata); - - // Sanity: bytes on disk must NOT contain the plaintext parquet magic, since - // the file is wrapped in AGS1 framing. - let raw_bytes = file_io.new_input(&file_path)?.read().await?; - assert_ne!( - &raw_bytes[..4], - b"PAR1", - "encrypted file should not start with plaintext parquet magic" - ); + let decoded_km = StandardKeyMetadata::decode( + data_file + .key_metadata() + .expect("encrypted data file must carry key_metadata"), + )?; + assert_eq!(decoded_km, expected_km); - // Reading through EncryptedInputFile yields the original parquet stream. - let encrypted_input = - EncryptedInputFile::new(file_io.new_input(&file_path)?, decoded_km); - let plaintext = encrypted_input.read().await?; + let plaintext = EncryptedInputFile::new(file_io.new_input(path)?, decoded_km) + .read() + .await?; let reader = ParquetRecordBatchReaderBuilder::try_new(plaintext) .unwrap() .build() .unwrap(); let batches: Vec = reader.map(|b| b.unwrap()).collect(); - let round_tripped = concat_batches(&arrow_schema, &batches).unwrap(); - assert_eq!(to_write, round_tripped); + assert_eq!(batch, concat_batches(&arrow_schema, &batches).unwrap()); Ok(()) } From 2632dba3f7fcff0a7ab4df2b2a8902763a469ae4 Mon Sep 17 00:00:00 2001 From: Aarushi Gupta Date: Mon, 29 Jun 2026 10:30:15 +0100 Subject: [PATCH 4/5] refactor(encryption): pluggable FileEncryptionHandler on write path --- crates/iceberg/src/encryption/handler.rs | 111 +++++++++++++++ crates/iceberg/src/encryption/io.rs | 15 ++- crates/iceberg/src/encryption/manager.rs | 32 +++-- crates/iceberg/src/encryption/mod.rs | 2 + crates/iceberg/src/writer/file_writer/mod.rs | 23 +--- .../src/writer/file_writer/parquet_writer.rs | 127 ++++++++---------- 6 files changed, 201 insertions(+), 109 deletions(-) create mode 100644 crates/iceberg/src/encryption/handler.rs diff --git a/crates/iceberg/src/encryption/handler.rs b/crates/iceberg/src/encryption/handler.rs new file mode 100644 index 0000000000..1aea759df0 --- /dev/null +++ b/crates/iceberg/src/encryption/handler.rs @@ -0,0 +1,111 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable generation of per-file `key_metadata` on the write path. + +use std::fmt::Debug; + +use aes_gcm::aead::OsRng; +use aes_gcm::aead::rand_core::RngCore; +use async_trait::async_trait; + +use super::crypto::{AesKeySize, SecureKey}; +use super::key_metadata::StandardKeyMetadata; +use crate::Result; + +/// AAD prefix length in bytes. +/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`. +const AAD_PREFIX_LENGTH: usize = 16; + +/// Produces the per-file `key_metadata` that the writer attaches to each +/// emitted [`DataFile`] and uses to encrypt the file. +/// +/// The spec defines `key_metadata` (field 131) as implementation-specific. The +/// reference *standard* encryption scheme stores a [`StandardKeyMetadata`] +/// containing a fresh plaintext DEK + AAD prefix per file, generated locally +/// without a KMS round-trip (see [`StandardFileEncryptionHandler`]). Other +/// schemes may need to call out to a KMS to wrap a freshly minted DEK, hence +/// the `async` signature. +/// +/// This is the write-side counterpart of `FileKeyResolver` on the read path: +/// readers resolve `key_metadata` bytes back into a [`StandardKeyMetadata`]; +/// writers produce one to embed. +/// +/// [`DataFile`]: crate::spec::DataFile +#[async_trait] +pub trait FileEncryptionHandler: Debug + Send + Sync { + /// Produce key material for the next file to be written. + async fn next_key_metadata(&self) -> Result; +} + +/// Default [`FileEncryptionHandler`] for the standard encryption scheme. +/// +/// Generates a fresh random DEK and AAD prefix per file with no KMS +/// round-trip; satisfies the async signature trivially. +#[derive(Debug, Default, Clone)] +pub struct StandardFileEncryptionHandler { + key_size: AesKeySize, +} + +impl StandardFileEncryptionHandler { + /// Creates a new handler with the given DEK size. + pub fn new(key_size: AesKeySize) -> Self { + Self { key_size } + } +} + +#[async_trait] +impl FileEncryptionHandler for StandardFileEncryptionHandler { + async fn next_key_metadata(&self) -> Result { + Ok(generate_standard_key_metadata(self.key_size)) + } +} + +/// Generate a [`StandardKeyMetadata`] with a fresh random DEK and AAD prefix. +pub(crate) fn generate_standard_key_metadata(key_size: AesKeySize) -> StandardKeyMetadata { + let dek = SecureKey::generate(key_size); + let aad_prefix = generate_aad_prefix(); + StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix) +} + +fn generate_aad_prefix() -> Box<[u8]> { + let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; + OsRng.fill_bytes(&mut prefix); + prefix.into_boxed_slice() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_standard_handler_emits_distinct_keys() { + let handler = StandardFileEncryptionHandler::default(); + let a = handler.next_key_metadata().await.unwrap(); + let b = handler.next_key_metadata().await.unwrap(); + assert_ne!( + a.encryption_key().as_bytes(), + b.encryption_key().as_bytes(), + "each file must get a fresh DEK" + ); + assert_ne!( + a.aad_prefix(), + b.aad_prefix(), + "each file must get a fresh AAD prefix" + ); + } +} diff --git a/crates/iceberg/src/encryption/io.rs b/crates/iceberg/src/encryption/io.rs index c3d81dd850..02ce81a2e7 100644 --- a/crates/iceberg/src/encryption/io.rs +++ b/crates/iceberg/src/encryption/io.rs @@ -130,9 +130,18 @@ impl EncryptedOutputFile { /// Creates a writer that transparently encrypts on each write. pub async fn writer(&self) -> Result> { - let raw_writer = self.inner.writer().await?; - let cipher = build_cipher(&self.key_metadata)?; - let aad_prefix: Box<[u8]> = self.key_metadata.aad_prefix().unwrap_or_default().into(); + Self::wrap_writer(self.inner.writer().await?, &self.key_metadata) + } + + /// Wrap an already-opened raw writer with AES-GCM stream encryption keyed + /// from `key_metadata`. Used by writers that want to lazily open their + /// underlying output and still apply standard encryption. + pub(crate) fn wrap_writer( + raw_writer: Box, + key_metadata: &StandardKeyMetadata, + ) -> Result> { + let cipher = build_cipher(key_metadata)?; + let aad_prefix: Box<[u8]> = key_metadata.aad_prefix().unwrap_or_default().into(); Ok(Box::new(AesGcmFileWrite::new( raw_writer, cipher, aad_prefix, ))) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index d88c4cb8f7..25f5417109 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -28,8 +28,7 @@ use std::fmt; use std::sync::{Arc, RwLock}; use std::time::Duration; -use aes_gcm::aead::OsRng; -use aes_gcm::aead::rand_core::RngCore; +use async_trait::async_trait; use chrono::Utc; use moka::future::Cache; use uuid::Uuid; @@ -37,6 +36,7 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +use super::handler::{FileEncryptionHandler, generate_standard_key_metadata}; use super::io::EncryptedOutputFile; use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; @@ -54,10 +54,6 @@ const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730; /// Default cache TTL for unwrapped KEKs. const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600); -/// Default AAD prefix length in bytes. -/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`. -const AAD_PREFIX_LENGTH: usize = 16; - /// File-level encryption manager using two-layer envelope encryption. /// /// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls. @@ -151,10 +147,7 @@ impl EncryptionManager { /// Returns an [`EncryptedOutputFile`] that transparently encrypts on /// write, along with key metadata for later decryption. pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile { - let dek = SecureKey::generate(self.key_size); - let aad_prefix = Self::generate_aad_prefix(); - let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix); - EncryptedOutputFile::new(raw_output, metadata) + EncryptedOutputFile::new(raw_output, generate_standard_key_metadata(self.key_size)) } /// Wrap a manifest list key metadata with a KEK for storage in table metadata. @@ -397,13 +390,6 @@ impl EncryptionManager { }) } - /// Generate a random AAD prefix for file encryption. - fn generate_aad_prefix() -> Box<[u8]> { - let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; - OsRng.fill_bytes(&mut prefix); - prefix.into_boxed_slice() - } - /// Wrap a DEK with a KEK using local AES-GCM. fn wrap_dek_with_kek( &self, @@ -429,6 +415,18 @@ impl EncryptionManager { } } +#[async_trait] +impl FileEncryptionHandler for EncryptionManager { + /// Generate per-file key metadata for the standard encryption scheme. + /// + /// Returns a fresh plaintext DEK + AAD prefix sized to the manager's + /// configured [`AesKeySize`]. No KMS round-trip — the KMS/KEK envelope + /// work happens one tier up when the manifest-list key metadata is wrapped. + async fn next_key_metadata(&self) -> Result { + Ok(generate_standard_key_metadata(self.key_size)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 12ee76e5e0..161b88cff3 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,6 +21,7 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; +mod handler; pub(crate) mod io; pub(crate) mod key_metadata; pub mod kms; @@ -28,6 +29,7 @@ mod manager; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +pub use handler::{FileEncryptionHandler, StandardFileEncryptionHandler}; pub use io::{EncryptedInputFile, EncryptedOutputFile}; pub use key_metadata::StandardKeyMetadata; pub use kms::{GeneratedKey, KeyManagementClient}; diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 0f46e635cd..e77d4c0af1 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -21,9 +21,8 @@ use arrow_array::RecordBatch; use futures::Future; use super::CurrentFileStatus; -use crate::encryption::EncryptedOutputFile; +use crate::Result; use crate::spec::DataFileBuilder; -use crate::{Error, ErrorKind, Result}; mod parquet_writer; pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder}; @@ -41,21 +40,13 @@ pub trait FileWriterBuilder: Clone + Send + Sync + 'static { /// The associated file writer type. type R: FileWriter; /// Build file writer. + /// + /// Whether the resulting file is encrypted is determined by the builder's + /// own configuration (e.g. a [`FileEncryptionHandler`] configured upfront), + /// not by the caller picking a different `build` method. + /// + /// [`FileEncryptionHandler`]: crate::encryption::FileEncryptionHandler fn build(&self, output_file: OutputFile) -> impl Future> + Send; - - /// Formats that support encryption should override this to wire the encrypted output through their writer. - fn build_encrypted( - &self, - encrypted_output: EncryptedOutputFile, - ) -> impl Future> + Send { - async move { - let _ = encrypted_output; - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Encryption is not supported by this file writer", - )) - } - } } /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 8afcd308de..d454bc2f49 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -36,7 +36,7 @@ use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; -use crate::encryption::EncryptedOutputFile; +use crate::encryption::{EncryptedOutputFile, FileEncryptionHandler, StandardKeyMetadata}; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, @@ -47,49 +47,13 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; -enum OutputTarget { - Plain(OutputFile), - Encrypted(EncryptedOutputFile), -} - -impl OutputTarget { - async fn writer(&self) -> Result> { - match self { - Self::Plain(o) => o.writer().await, - Self::Encrypted(e) => e.writer().await, - } - } - - async fn delete(&self) -> Result<()> { - match self { - Self::Plain(o) => o.delete().await, - Self::Encrypted(e) => e.delete().await, - } - } - - fn location(&self) -> &str { - match self { - Self::Plain(o) => o.location(), - Self::Encrypted(e) => e.location(), - } - } - - /// Returns the encoded `StandardKeyMetadata` bytes for the encrypted - /// variant - fn encoded_key_metadata(&self) -> Result>> { - match self { - Self::Plain(_) => Ok(None), - Self::Encrypted(e) => Ok(Some(e.key_metadata().encode()?.into_vec())), - } - } -} - /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { props: WriterProperties, schema: SchemaRef, match_mode: FieldMatchMode, + file_encryption_handler: Option>, } impl ParquetWriterBuilder { @@ -109,39 +73,39 @@ impl ParquetWriterBuilder { props, schema, match_mode, + file_encryption_handler: None, } } + + /// Configure per-file encryption via a [`FileEncryptionHandler`]. + /// + /// When set, every file produced by this builder is wrapped in AES-GCM + /// stream encryption keyed from the handler's `next_key_metadata`, and the + /// resulting [`DataFile::key_metadata`] carries the encoded + /// [`StandardKeyMetadata`] so readers can decrypt the file. + pub fn with_file_encryption_handler( + mut self, + handler: Arc, + ) -> Self { + self.file_encryption_handler = Some(handler); + self + } } impl FileWriterBuilder for ParquetWriterBuilder { type R = ParquetWriter; async fn build(&self, output_file: OutputFile) -> Result { - Ok(self.build_with_target(OutputTarget::Plain(output_file))) - } - - /// Builds a `ParquetWriter` that writes an AES-GCM-encrypted parquet file. - /// - /// On close, the resulting [`DataFile::key_metadata`] is populated with the encoded - /// `StandardKeyMetadata` from the wrapper so readers can decrypt the file. - async fn build_encrypted( - &self, - encrypted_output: EncryptedOutputFile, - ) -> Result { - Ok(self.build_with_target(OutputTarget::Encrypted(encrypted_output))) - } -} - -impl ParquetWriterBuilder { - fn build_with_target(&self, output_target: OutputTarget) -> ParquetWriter { - ParquetWriter { + Ok(ParquetWriter { schema: self.schema.clone(), inner_writer: None, writer_properties: self.props.clone(), current_row_num: 0, - output_target, + output_file, + file_encryption_handler: self.file_encryption_handler.clone(), + resolved_key_metadata: None, nan_value_count_visitor: NanValueCountVisitor::new_with_match_mode(self.match_mode), - } + }) } } @@ -266,7 +230,13 @@ impl SchemaVisitor for IndexByParquetPathName { /// `ParquetWriter`` is used to write arrow data into parquet file on storage. pub struct ParquetWriter { schema: SchemaRef, - output_target: OutputTarget, + output_file: OutputFile, + /// When set, the writer wraps its raw output with AES-GCM stream + /// encryption using key material from this handler. + file_encryption_handler: Option>, + /// Set on lazy init when encryption is enabled — captured here so + /// `close()` can emit it on the resulting `DataFile::key_metadata`. + resolved_key_metadata: Option, inner_writer: Option>, writer_properties: WriterProperties, current_row_num: usize, @@ -548,7 +518,15 @@ impl FileWriter for ParquetWriter { writer } else { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let inner_writer = self.output_target.writer().await?; + let raw_writer = self.output_file.writer().await?; + let inner_writer = if let Some(handler) = &self.file_encryption_handler { + let key_metadata = handler.next_key_metadata().await?; + let wrapped = EncryptedOutputFile::wrap_writer(raw_writer, &key_metadata)?; + self.resolved_key_metadata = Some(key_metadata); + wrapped + } else { + raw_writer + }; let async_writer = AsyncFileWriter::new(inner_writer); let writer = AsyncArrowWriter::try_new( async_writer, @@ -587,7 +565,7 @@ impl FileWriter for ParquetWriter { let written_size = writer.bytes_written(); if self.current_row_num == 0 { - self.output_target.delete().await.map_err(|err| { + self.output_file.delete().await.map_err(|err| { Error::new( ErrorKind::Unexpected, "Failed to delete empty parquet file.", @@ -598,12 +576,16 @@ impl FileWriter for ParquetWriter { } else { let parquet_metadata = Arc::new(metadata); - let key_metadata = self.output_target.encoded_key_metadata()?; + let key_metadata = self + .resolved_key_metadata + .as_ref() + .map(|km| km.encode().map(|b| b.into_vec())) + .transpose()?; Ok(vec![Self::parquet_to_data_file_builder( self.schema, parquet_metadata, written_size, - self.output_target.location().to_string(), + self.output_file.location().to_string(), self.nan_value_count_visitor.nan_value_counts, key_metadata, )?]) @@ -613,7 +595,7 @@ impl FileWriter for ParquetWriter { impl CurrentFileStatus for ParquetWriter { fn current_file_path(&self) -> String { - self.output_target.location().to_string() + self.output_file.location().to_string() } fn current_row_num(&self) -> usize { @@ -934,10 +916,12 @@ mod tests { let kms = MemoryKeyManagementClient::new(); kms.add_master_key("master-1").unwrap(); - let manager = EncryptionManager::builder() - .kms_client(Arc::new(kms) as Arc) - .table_key_id("master-1") - .build(); + let manager = Arc::new( + EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(), + ); let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![ Field::new("col", DataType::Int64, true).with_metadata(HashMap::from([( @@ -951,14 +935,12 @@ mod tests { ) .unwrap(); - let encrypted_out = manager.encrypt(file_io.new_output(path)?); - let expected_km = encrypted_out.key_metadata().clone(); - let mut pw = ParquetWriterBuilder::new( WriterProperties::builder().build(), Arc::new(arrow_schema.as_ref().try_into().unwrap()), ) - .build_encrypted(encrypted_out) + .with_file_encryption_handler(manager as Arc) + .build(file_io.new_output(path)?) .await?; pw.write(&batch).await?; let mut builders = pw.close().await?; @@ -977,7 +959,6 @@ mod tests { .key_metadata() .expect("encrypted data file must carry key_metadata"), )?; - assert_eq!(decoded_km, expected_km); let plaintext = EncryptedInputFile::new(file_io.new_input(path)?, decoded_km) .read() From 01ec24c9bcd92d1b21093610ff34c4778c66114c Mon Sep 17 00:00:00 2001 From: Aarushi Gupta Date: Mon, 29 Jun 2026 11:03:18 +0100 Subject: [PATCH 5/5] fix(encryption): use Parquet Modular Encryption for data files --- crates/iceberg/src/encryption/io.rs | 15 +--- .../src/writer/file_writer/parquet_writer.rs | 74 ++++++++++++++----- 2 files changed, 58 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/encryption/io.rs b/crates/iceberg/src/encryption/io.rs index 02ce81a2e7..c3d81dd850 100644 --- a/crates/iceberg/src/encryption/io.rs +++ b/crates/iceberg/src/encryption/io.rs @@ -130,18 +130,9 @@ impl EncryptedOutputFile { /// Creates a writer that transparently encrypts on each write. pub async fn writer(&self) -> Result> { - Self::wrap_writer(self.inner.writer().await?, &self.key_metadata) - } - - /// Wrap an already-opened raw writer with AES-GCM stream encryption keyed - /// from `key_metadata`. Used by writers that want to lazily open their - /// underlying output and still apply standard encryption. - pub(crate) fn wrap_writer( - raw_writer: Box, - key_metadata: &StandardKeyMetadata, - ) -> Result> { - let cipher = build_cipher(key_metadata)?; - let aad_prefix: Box<[u8]> = key_metadata.aad_prefix().unwrap_or_default().into(); + let raw_writer = self.inner.writer().await?; + let cipher = build_cipher(&self.key_metadata)?; + let aad_prefix: Box<[u8]> = self.key_metadata.aad_prefix().unwrap_or_default().into(); Ok(Box::new(AesGcmFileWrite::new( raw_writer, cipher, aad_prefix, ))) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index d454bc2f49..5ec8b28dd6 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,6 +27,7 @@ use itertools::Itertools; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; +use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics; @@ -36,7 +37,7 @@ use crate::arrow::{ ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, NanValueCountVisitor, get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, }; -use crate::encryption::{EncryptedOutputFile, FileEncryptionHandler, StandardKeyMetadata}; +use crate::encryption::{FileEncryptionHandler, StandardKeyMetadata}; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, @@ -47,6 +48,27 @@ use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; use crate::{Error, ErrorKind, Result}; +/// Build Parquet Modular Encryption properties from per-file key material. +/// +/// PME uses the DEK as the footer key (uniform encryption: same key for footer +/// and all column chunks) and the AAD prefix as the file-level AAD. +fn build_file_encryption_properties( + key_metadata: &StandardKeyMetadata, +) -> Result> { + let mut builder = + FileEncryptionProperties::builder(key_metadata.encryption_key().as_bytes().to_vec()); + if let Some(aad) = key_metadata.aad_prefix() { + builder = builder.with_aad_prefix(aad.to_vec()); + } + builder.build().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to build parquet file encryption properties", + ) + .with_source(e) + }) +} + /// ParquetWriterBuilder is used to builder a [`ParquetWriter`] #[derive(Clone, Debug)] pub struct ParquetWriterBuilder { @@ -518,20 +540,23 @@ impl FileWriter for ParquetWriter { writer } else { let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?); - let raw_writer = self.output_file.writer().await?; - let inner_writer = if let Some(handler) = &self.file_encryption_handler { + let writer_properties = if let Some(handler) = &self.file_encryption_handler { let key_metadata = handler.next_key_metadata().await?; - let wrapped = EncryptedOutputFile::wrap_writer(raw_writer, &key_metadata)?; + let file_encryption_properties = build_file_encryption_properties(&key_metadata)?; self.resolved_key_metadata = Some(key_metadata); - wrapped + self.writer_properties + .clone() + .into_builder() + .with_file_encryption_properties(file_encryption_properties) + .build() } else { - raw_writer + self.writer_properties.clone() }; - let async_writer = AsyncFileWriter::new(inner_writer); + let async_writer = AsyncFileWriter::new(self.output_file.writer().await?); let writer = AsyncArrowWriter::try_new( async_writer, arrow_schema.clone(), - Some(self.writer_properties.clone()), + Some(writer_properties), ) .map_err(|err| { Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.") @@ -670,7 +695,7 @@ mod tests { use super::*; use crate::arrow::schema_to_arrow_schema; use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient}; - use crate::encryption::{EncryptedInputFile, EncryptionManager}; + use crate::encryption::EncryptionManager; use crate::io::FileIO; use crate::spec::decimal_utils::{decimal_mantissa, decimal_new, decimal_scale}; use crate::spec::{PrimitiveLiteral, Struct, *}; @@ -907,9 +932,8 @@ mod tests { #[tokio::test] async fn test_parquet_writer_encrypted_round_trip() -> Result<()> { - use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; - - use crate::encryption::StandardKeyMetadata; + use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; + use parquet::encryption::decrypt::FileDecryptionProperties; let file_io = FileIO::new_with_memory(); let path = "memory:///encrypted_parquet.parquet"; @@ -960,13 +984,25 @@ mod tests { .expect("encrypted data file must carry key_metadata"), )?; - let plaintext = EncryptedInputFile::new(file_io.new_input(path)?, decoded_km) - .read() - .await?; - let reader = ParquetRecordBatchReaderBuilder::try_new(plaintext) - .unwrap() - .build() - .unwrap(); + // Read back via PME: build FileDecryptionProperties from the per-file + // key material and hand them to the parquet reader. The file on disk + // is a valid parquet file — no stream decryption involved. + let mut decryption_builder = FileDecryptionProperties::builder( + decoded_km.encryption_key().as_bytes().to_vec(), + ); + if let Some(aad) = decoded_km.aad_prefix() { + decryption_builder = decryption_builder.with_aad_prefix(aad.to_vec()); + } + let decryption_properties = decryption_builder.build().unwrap(); + + let ciphertext = file_io.new_input(path)?.read().await?; + let reader = ParquetRecordBatchReaderBuilder::try_new_with_options( + ciphertext, + ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties), + ) + .unwrap() + .build() + .unwrap(); let batches: Vec = reader.map(|b| b.unwrap()).collect(); assert_eq!(batch, concat_batches(&arrow_schema, &batches).unwrap());