Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2038,8 +2038,11 @@ pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: im
pub async fn iceberg::spec::ManifestListWriter::close(self) -> iceberg::Result<()>
pub fn iceberg::spec::ManifestListWriter::next_row_id(&self) -> core::option::Option<u64>
pub fn iceberg::spec::ManifestListWriter::v1(writer: alloc::boxed::Box<dyn iceberg::io::FileWrite>, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>) -> Self
pub async fn iceberg::spec::ManifestListWriter::v1_from_encrypted(encrypted_output: iceberg::encryption::EncryptedOutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>) -> iceberg::Result<Self>
pub fn iceberg::spec::ManifestListWriter::v2(writer: alloc::boxed::Box<dyn iceberg::io::FileWrite>, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64) -> Self
pub async fn iceberg::spec::ManifestListWriter::v2_from_encrypted(encrypted_output: iceberg::encryption::EncryptedOutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64) -> iceberg::Result<Self>
pub fn iceberg::spec::ManifestListWriter::v3(writer: alloc::boxed::Box<dyn iceberg::io::FileWrite>, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64, first_row_id: core::option::Option<u64>) -> Self
pub async fn iceberg::spec::ManifestListWriter::v3_from_encrypted(encrypted_output: iceberg::encryption::EncryptedOutputFile, snapshot_id: i64, parent_snapshot_id: core::option::Option<i64>, sequence_number: i64, first_row_id: core::option::Option<u64>) -> iceberg::Result<Self>
impl core::fmt::Debug for iceberg::spec::ManifestListWriter
pub fn iceberg::spec::ManifestListWriter::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
pub struct iceberg::spec::ManifestMetadata
Expand Down
267 changes: 267 additions & 0 deletions crates/iceberg/src/spec/manifest_list/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::_const_schema::{
};
use super::_serde::{ManifestFileV1, ManifestFileV2, ManifestFileV3};
use super::{FormatVersion, ManifestContentType, ManifestFile, UNASSIGNED_SEQUENCE_NUMBER};
use crate::encryption::EncryptedOutputFile;
use crate::error::Result;
use crate::io::FileWrite;
use crate::{Error, ErrorKind};
Expand Down Expand Up @@ -136,6 +137,56 @@ impl ManifestListWriter {
)
}

/// Construct a v1 [`ManifestListWriter`] that writes to an [`EncryptedOutputFile`].
///
/// Clone `encrypted_output.key_metadata()` before calling — the wrapper is
/// consumed, and the metadata is needed to call
/// `EncryptionManager::encrypt_manifest_list_key_metadata` after [`Self::close`].
pub async fn v1_from_encrypted(
encrypted_output: EncryptedOutputFile,
snapshot_id: i64,
parent_snapshot_id: Option<i64>,
) -> Result<Self> {
let writer = encrypted_output.writer().await?;
Ok(Self::v1(writer, snapshot_id, parent_snapshot_id))
}

/// Construct a v2 [`ManifestListWriter`] that writes to an [`EncryptedOutputFile`].
/// See [`Self::v1_from_encrypted`] for the key-metadata contract.
pub async fn v2_from_encrypted(
encrypted_output: EncryptedOutputFile,
snapshot_id: i64,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
) -> Result<Self> {
let writer = encrypted_output.writer().await?;
Ok(Self::v2(
writer,
snapshot_id,
parent_snapshot_id,
sequence_number,
))
}

/// Construct a v3 [`ManifestListWriter`] that writes to an [`EncryptedOutputFile`].
/// See [`Self::v1_from_encrypted`] for the key-metadata contract.
pub async fn v3_from_encrypted(
encrypted_output: EncryptedOutputFile,
snapshot_id: i64,
parent_snapshot_id: Option<i64>,
sequence_number: i64,
first_row_id: Option<u64>,
) -> Result<Self> {
let writer = encrypted_output.writer().await?;
Ok(Self::v3(
writer,
snapshot_id,
parent_snapshot_id,
sequence_number,
first_row_id,
))
}

fn new(
format_version: FormatVersion,
writer: Box<dyn FileWrite>,
Expand Down Expand Up @@ -315,10 +366,13 @@ fn require_row_counts_in_manifest(manifest: &ManifestFile) -> Result<(u64, u64)>
mod test {
use std::fs;
use std::path::Path;
use std::sync::Arc;

use tempfile::TempDir;

use super::ManifestListWriter;
use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient};
use crate::encryption::{EncryptedInputFile, EncryptionManager};
use crate::io::{FileIO, FileWrite};
use crate::spec::{
Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList,
Expand Down Expand Up @@ -607,6 +661,219 @@ mod test {
temp_dir.close().unwrap();
}

#[tokio::test]
async fn test_manifest_list_writer_v1_encrypted_round_trip() {
let (mgr, file_io) = fresh_encryption_manager_and_io();
let path = "memory:///manifest_list_v1_encrypted.avro";

let encrypted_output = mgr.encrypt(file_io.new_output(path).unwrap());
let key_metadata = encrypted_output.key_metadata().clone();

let snapshot_id = 1_646_658_105_718_557_341i64;
let expected = ManifestList {
entries: vec![ManifestFile {
manifest_path: "memory:///encrypted/v1_m0.avro".to_string(),
manifest_length: 5806,
partition_spec_id: 1,
content: ManifestContentType::Data,
sequence_number: 0,
min_sequence_number: 0,
added_snapshot_id: snapshot_id,
added_files_count: Some(3),
existing_files_count: Some(0),
deleted_files_count: Some(0),
added_rows_count: Some(3),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: Some(vec![FieldSummary {
contains_null: false,
contains_nan: Some(false),
lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap()),
}]),
key_metadata: None,
first_row_id: None,
}],
};

let mut writer =
ManifestListWriter::v1_from_encrypted(encrypted_output, snapshot_id, Some(0))
.await
.unwrap();
writer
.add_manifests(expected.entries.clone().into_iter())
.unwrap();
writer.close().await.unwrap();

let plaintext = EncryptedInputFile::new(file_io.new_input(path).unwrap(), key_metadata)
.read()
.await
.unwrap();
let manifest_list =
ManifestList::parse_with_version(&plaintext, crate::spec::FormatVersion::V1).unwrap();
assert_eq!(manifest_list, expected);
}

#[tokio::test]
async fn test_manifest_list_writer_v2_encrypted_round_trip() {
let (mgr, file_io) = fresh_encryption_manager_and_io();
let path = "memory:///manifest_list_v2_encrypted.avro";

let encrypted_output = mgr.encrypt(file_io.new_output(path).unwrap());
let key_metadata = encrypted_output.key_metadata().clone();

let snapshot_id = 377_075_049_360_453_639i64;
let seq_num = 1i64;
let mut expected = ManifestList {
entries: vec![ManifestFile {
manifest_path: "memory:///encrypted/v2_m0.avro".to_string(),
manifest_length: 6926,
partition_spec_id: 1,
content: ManifestContentType::Data,
sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
added_snapshot_id: snapshot_id,
added_files_count: Some(1),
existing_files_count: Some(0),
deleted_files_count: Some(0),
added_rows_count: Some(3),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: Some(vec![FieldSummary {
contains_null: false,
contains_nan: Some(false),
lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap()),
}]),
key_metadata: None,
first_row_id: None,
}],
};

let mut writer =
ManifestListWriter::v2_from_encrypted(encrypted_output, snapshot_id, Some(0), seq_num)
.await
.unwrap();
writer
.add_manifests(expected.entries.clone().into_iter())
.unwrap();
writer.close().await.unwrap();

let plaintext = EncryptedInputFile::new(file_io.new_input(path).unwrap(), key_metadata)
.read()
.await
.unwrap();
let manifest_list =
ManifestList::parse_with_version(&plaintext, crate::spec::FormatVersion::V2).unwrap();

// v2/v3 fill in unassigned sequence numbers on add, so apply the same
// adjustment to expected before comparing.
expected.entries[0].sequence_number = seq_num;
expected.entries[0].min_sequence_number = seq_num;
assert_eq!(manifest_list, expected);
}

#[tokio::test]
async fn test_manifest_list_writer_v3_encrypted_round_trip() {
let (mgr, file_io) = fresh_encryption_manager_and_io();
let path = "memory:///manifest_list_v3_encrypted.avro";

// Mint a fresh DEK for this file. We have to clone the key_metadata now —
// v3_from_encrypted consumes the wrapper and we'll need the metadata back
// when reading.
let encrypted_output = mgr.encrypt(file_io.new_output(path).unwrap());
let key_metadata = encrypted_output.key_metadata().clone();

let snapshot_id = 9_000_000_000_000_001i64;
let seq_num = 7i64;
let mut expected = ManifestList {
entries: vec![ManifestFile {
manifest_path: "memory:///encrypted/v3_m0.avro".to_string(),
manifest_length: 1234,
partition_spec_id: 0,
content: ManifestContentType::Data,
sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
min_sequence_number: UNASSIGNED_SEQUENCE_NUMBER,
added_snapshot_id: snapshot_id,
added_files_count: Some(2),
existing_files_count: Some(0),
deleted_files_count: Some(0),
added_rows_count: Some(10),
existing_rows_count: Some(0),
deleted_rows_count: Some(0),
partitions: Some(vec![FieldSummary {
contains_null: false,
contains_nan: Some(false),
lower_bound: Some(Datum::long(1).to_bytes().unwrap()),
upper_bound: Some(Datum::long(1).to_bytes().unwrap()),
}]),
key_metadata: None,
first_row_id: None,
}],
};

let mut writer = ManifestListWriter::v3_from_encrypted(
encrypted_output,
snapshot_id,
Some(0),
seq_num,
None,
)
.await
.unwrap();
writer
.add_manifests(expected.entries.clone().into_iter())
.unwrap();
writer.close().await.unwrap();

// Sanity check: the bytes on disk should be ciphertext, not Avro. If a future
// refactor accidentally bypassed the cipher this would catch it.
let raw_bytes = file_io.new_input(path).unwrap().read().await.unwrap();
assert!(
ManifestList::parse_with_version(&raw_bytes, crate::spec::FormatVersion::V3).is_err(),
"raw bytes should be ciphertext, not parseable as Avro"
);

// Exercise the KEK wrap/unwrap path that callers will use to recover the
// key metadata from a snapshot's encryption_key_id.
let wrapped_key_id = mgr
.encrypt_manifest_list_key_metadata(&key_metadata)
.await
.unwrap();
let recovered_key_metadata = mgr
.decrypt_manifest_list_key_metadata(&wrapped_key_id)
.await
.unwrap();
assert_eq!(recovered_key_metadata, key_metadata);

// And now the actual round-trip: decrypt the file via the recovered metadata
// and check the manifest list entries survived.
let input = file_io.new_input(path).unwrap();
let plaintext = EncryptedInputFile::new(input, recovered_key_metadata)
.read()
.await
.unwrap();
let manifest_list =
ManifestList::parse_with_version(&plaintext, crate::spec::FormatVersion::V3).unwrap();

expected.entries[0].sequence_number = seq_num;
expected.entries[0].min_sequence_number = seq_num;
assert_eq!(manifest_list, expected);
}

/// Spins up a fresh in-process KMS + manager + in-memory FileIO for an
/// encrypted-write test. The KMS holds one master key under `"master-1"`,
/// and the manager is told to use that id as the table's active key.
fn fresh_encryption_manager_and_io() -> (EncryptionManager, FileIO) {
let kms = MemoryKeyManagementClient::new();
kms.add_master_key("master-1").unwrap();
let mgr = EncryptionManager::builder()
.kms_client(Arc::new(kms) as Arc<dyn KeyManagementClient>)
.table_key_id("master-1")
.build();
(mgr, FileIO::new_with_memory())
}

async fn file_writer(path: &Path, io: FileIO) -> Box<dyn FileWrite> {
io.new_output(path.to_str().unwrap())
.unwrap()
Expand Down
Loading