diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index 653649e6cf..c489e4ae0d 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -2038,8 +2038,11 @@ pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: im pub async fn iceberg::spec::ManifestListWriter::close(self) -> iceberg::Result<()> pub fn iceberg::spec::ManifestListWriter::next_row_id(&self) -> core::option::Option pub fn iceberg::spec::ManifestListWriter::v1(writer: alloc::boxed::Box, snapshot_id: i64, parent_snapshot_id: core::option::Option) -> Self +pub async fn iceberg::spec::ManifestListWriter::v1_from_encrypted(encrypted_output: iceberg::encryption::EncryptedOutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option) -> iceberg::Result pub fn iceberg::spec::ManifestListWriter::v2(writer: alloc::boxed::Box, snapshot_id: i64, parent_snapshot_id: core::option::Option, sequence_number: i64) -> Self +pub async fn iceberg::spec::ManifestListWriter::v2_from_encrypted(encrypted_output: iceberg::encryption::EncryptedOutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option, sequence_number: i64) -> iceberg::Result pub fn iceberg::spec::ManifestListWriter::v3(writer: alloc::boxed::Box, snapshot_id: i64, parent_snapshot_id: core::option::Option, sequence_number: i64, first_row_id: core::option::Option) -> Self +pub async fn iceberg::spec::ManifestListWriter::v3_from_encrypted(encrypted_output: iceberg::encryption::EncryptedOutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option, sequence_number: i64, first_row_id: core::option::Option) -> iceberg::Result impl core::fmt::Debug for iceberg::spec::ManifestListWriter pub fn iceberg::spec::ManifestListWriter::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result pub struct iceberg::spec::ManifestMetadata diff --git a/crates/iceberg/src/spec/manifest_list/writer.rs b/crates/iceberg/src/spec/manifest_list/writer.rs index f739870171..f132e50e9e 100644 --- a/crates/iceberg/src/spec/manifest_list/writer.rs +++ b/crates/iceberg/src/spec/manifest_list/writer.rs @@ -25,6 +25,7 @@ use super::_const_schema::{ }; use super::_serde::{ManifestFileV1, ManifestFileV2, ManifestFileV3}; use super::{FormatVersion, ManifestContentType, ManifestFile, UNASSIGNED_SEQUENCE_NUMBER}; +use crate::encryption::EncryptedOutputFile; use crate::error::Result; use crate::io::FileWrite; use crate::{Error, ErrorKind}; @@ -136,6 +137,56 @@ impl ManifestListWriter { ) } + /// Construct a v1 [`ManifestListWriter`] that writes to an [`EncryptedOutputFile`]. + /// + /// Clone `encrypted_output.key_metadata()` before calling — the wrapper is + /// consumed, and the metadata is needed to call + /// `EncryptionManager::encrypt_manifest_list_key_metadata` after [`Self::close`]. + pub async fn v1_from_encrypted( + encrypted_output: EncryptedOutputFile, + snapshot_id: i64, + parent_snapshot_id: Option, + ) -> Result { + let writer = encrypted_output.writer().await?; + Ok(Self::v1(writer, snapshot_id, parent_snapshot_id)) + } + + /// Construct a v2 [`ManifestListWriter`] that writes to an [`EncryptedOutputFile`]. + /// See [`Self::v1_from_encrypted`] for the key-metadata contract. + pub async fn v2_from_encrypted( + encrypted_output: EncryptedOutputFile, + snapshot_id: i64, + parent_snapshot_id: Option, + sequence_number: i64, + ) -> Result { + let writer = encrypted_output.writer().await?; + Ok(Self::v2( + writer, + snapshot_id, + parent_snapshot_id, + sequence_number, + )) + } + + /// Construct a v3 [`ManifestListWriter`] that writes to an [`EncryptedOutputFile`]. + /// See [`Self::v1_from_encrypted`] for the key-metadata contract. + pub async fn v3_from_encrypted( + encrypted_output: EncryptedOutputFile, + snapshot_id: i64, + parent_snapshot_id: Option, + sequence_number: i64, + first_row_id: Option, + ) -> Result { + let writer = encrypted_output.writer().await?; + Ok(Self::v3( + writer, + snapshot_id, + parent_snapshot_id, + sequence_number, + first_row_id, + )) + } + fn new( format_version: FormatVersion, writer: Box, @@ -315,10 +366,13 @@ fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)> mod test { use std::fs; use std::path::Path; + use std::sync::Arc; use tempfile::TempDir; use super::ManifestListWriter; + use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient}; + use crate::encryption::{EncryptedInputFile, EncryptionManager}; use crate::io::{FileIO, FileWrite}; use crate::spec::{ Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, @@ -607,6 +661,219 @@ mod test { temp_dir.close().unwrap(); } + #[tokio::test] + async fn test_manifest_list_writer_v1_encrypted_round_trip() { + let (mgr, file_io) = fresh_encryption_manager_and_io(); + let path = "memory:///manifest_list_v1_encrypted.avro"; + + let encrypted_output = mgr.encrypt(file_io.new_output(path).unwrap()); + let key_metadata = encrypted_output.key_metadata().clone(); + + let snapshot_id = 1_646_658_105_718_557_341i64; + let expected = ManifestList { + entries: vec![ManifestFile { + manifest_path: "memory:///encrypted/v1_m0.avro".to_string(), + manifest_length: 5806, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: 0, + min_sequence_number: 0, + added_snapshot_id: snapshot_id, + added_files_count: Some(3), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some(vec![FieldSummary { + contains_null: false, + contains_nan: Some(false), + lower_bound: Some(Datum::long(1).to_bytes().unwrap()), + upper_bound: Some(Datum::long(1).to_bytes().unwrap()), + }]), + key_metadata: None, + first_row_id: None, + }], + }; + + let mut writer = + ManifestListWriter::v1_from_encrypted(encrypted_output, snapshot_id, Some(0)) + .await + .unwrap(); + writer + .add_manifests(expected.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let plaintext = EncryptedInputFile::new(file_io.new_input(path).unwrap(), key_metadata) + .read() + .await + .unwrap(); + let manifest_list = + ManifestList::parse_with_version(&plaintext, crate::spec::FormatVersion::V1).unwrap(); + assert_eq!(manifest_list, expected); + } + + #[tokio::test] + async fn test_manifest_list_writer_v2_encrypted_round_trip() { + let (mgr, file_io) = fresh_encryption_manager_and_io(); + let path = "memory:///manifest_list_v2_encrypted.avro"; + + let encrypted_output = mgr.encrypt(file_io.new_output(path).unwrap()); + let key_metadata = encrypted_output.key_metadata().clone(); + + let snapshot_id = 377_075_049_360_453_639i64; + let seq_num = 1i64; + let mut expected = ManifestList { + entries: vec![ManifestFile { + manifest_path: "memory:///encrypted/v2_m0.avro".to_string(), + manifest_length: 6926, + partition_spec_id: 1, + content: ManifestContentType::Data, + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + added_snapshot_id: snapshot_id, + added_files_count: Some(1), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(3), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some(vec![FieldSummary { + contains_null: false, + contains_nan: Some(false), + lower_bound: Some(Datum::long(1).to_bytes().unwrap()), + upper_bound: Some(Datum::long(1).to_bytes().unwrap()), + }]), + key_metadata: None, + first_row_id: None, + }], + }; + + let mut writer = + ManifestListWriter::v2_from_encrypted(encrypted_output, snapshot_id, Some(0), seq_num) + .await + .unwrap(); + writer + .add_manifests(expected.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + let plaintext = EncryptedInputFile::new(file_io.new_input(path).unwrap(), key_metadata) + .read() + .await + .unwrap(); + let manifest_list = + ManifestList::parse_with_version(&plaintext, crate::spec::FormatVersion::V2).unwrap(); + + // v2/v3 fill in unassigned sequence numbers on add, so apply the same + // adjustment to expected before comparing. + expected.entries[0].sequence_number = seq_num; + expected.entries[0].min_sequence_number = seq_num; + assert_eq!(manifest_list, expected); + } + + #[tokio::test] + async fn test_manifest_list_writer_v3_encrypted_round_trip() { + let (mgr, file_io) = fresh_encryption_manager_and_io(); + let path = "memory:///manifest_list_v3_encrypted.avro"; + + // Mint a fresh DEK for this file. We have to clone the key_metadata now — + // v3_from_encrypted consumes the wrapper and we'll need the metadata back + // when reading. + let encrypted_output = mgr.encrypt(file_io.new_output(path).unwrap()); + let key_metadata = encrypted_output.key_metadata().clone(); + + let snapshot_id = 9_000_000_000_000_001i64; + let seq_num = 7i64; + let mut expected = ManifestList { + entries: vec![ManifestFile { + manifest_path: "memory:///encrypted/v3_m0.avro".to_string(), + manifest_length: 1234, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER, + added_snapshot_id: snapshot_id, + added_files_count: Some(2), + existing_files_count: Some(0), + deleted_files_count: Some(0), + added_rows_count: Some(10), + existing_rows_count: Some(0), + deleted_rows_count: Some(0), + partitions: Some(vec![FieldSummary { + contains_null: false, + contains_nan: Some(false), + lower_bound: Some(Datum::long(1).to_bytes().unwrap()), + upper_bound: Some(Datum::long(1).to_bytes().unwrap()), + }]), + key_metadata: None, + first_row_id: None, + }], + }; + + let mut writer = ManifestListWriter::v3_from_encrypted( + encrypted_output, + snapshot_id, + Some(0), + seq_num, + None, + ) + .await + .unwrap(); + writer + .add_manifests(expected.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + + // Sanity check: the bytes on disk should be ciphertext, not Avro. If a future + // refactor accidentally bypassed the cipher this would catch it. + let raw_bytes = file_io.new_input(path).unwrap().read().await.unwrap(); + assert!( + ManifestList::parse_with_version(&raw_bytes, crate::spec::FormatVersion::V3).is_err(), + "raw bytes should be ciphertext, not parseable as Avro" + ); + + // Exercise the KEK wrap/unwrap path that callers will use to recover the + // key metadata from a snapshot's encryption_key_id. + let wrapped_key_id = mgr + .encrypt_manifest_list_key_metadata(&key_metadata) + .await + .unwrap(); + let recovered_key_metadata = mgr + .decrypt_manifest_list_key_metadata(&wrapped_key_id) + .await + .unwrap(); + assert_eq!(recovered_key_metadata, key_metadata); + + // And now the actual round-trip: decrypt the file via the recovered metadata + // and check the manifest list entries survived. + let input = file_io.new_input(path).unwrap(); + let plaintext = EncryptedInputFile::new(input, recovered_key_metadata) + .read() + .await + .unwrap(); + let manifest_list = + ManifestList::parse_with_version(&plaintext, crate::spec::FormatVersion::V3).unwrap(); + + expected.entries[0].sequence_number = seq_num; + expected.entries[0].min_sequence_number = seq_num; + assert_eq!(manifest_list, expected); + } + + /// Spins up a fresh in-process KMS + manager + in-memory FileIO for an + /// encrypted-write test. The KMS holds one master key under `"master-1"`, + /// and the manager is told to use that id as the table's active key. + fn fresh_encryption_manager_and_io() -> (EncryptionManager, FileIO) { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(); + (mgr, FileIO::new_with_memory()) + } + async fn file_writer(path: &Path, io: FileIO) -> Box { io.new_output(path.to_str().unwrap()) .unwrap()