Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion crates/iceberg/src/transaction/expire_snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,26 @@ impl TransactionAction for ExpireSnapshotsAction {
.into_iter()
.map(|ref_name| TableUpdate::RemoveSnapshotRef { ref_name })
.collect();

// Drop the statistics metadata tied to each expired snapshot, mirroring Java
// `RemoveSnapshots.removeSnapshots`. This only rewrites metadata; the puffin files those
// entries point at are deleted by the higher-level file-cleanup maintenance operation.
for snapshot_id in &plan.ids_to_remove {
if metadata.statistics_for_snapshot(*snapshot_id).is_some() {
updates.push(TableUpdate::RemoveStatistics {
snapshot_id: *snapshot_id,
});
}
if metadata
.partition_statistics_for_snapshot(*snapshot_id)
.is_some()
{
updates.push(TableUpdate::RemovePartitionStatistics {
snapshot_id: *snapshot_id,
});
}
}

if !plan.ids_to_remove.is_empty() {
updates.push(TableUpdate::RemoveSnapshots {
snapshot_ids: plan.ids_to_remove,
Expand Down Expand Up @@ -349,7 +369,8 @@ mod tests {
use chrono::Utc;

use crate::spec::{
MAIN_BRANCH, Operation, Snapshot, SnapshotReference, SnapshotRetention, Summary,
MAIN_BRANCH, Operation, PartitionStatisticsFile, Snapshot, SnapshotReference,
SnapshotRetention, StatisticsFile, Summary,
};
use crate::table::Table;
use crate::transaction::Transaction;
Expand Down Expand Up @@ -472,6 +493,69 @@ mod tests {
base.with_metadata(Arc::new(builder.build().unwrap().metadata))
}

/// Like [`table_with`], but also attaches statistics and partition-statistics files.
fn table_with_stats(
snapshots: Vec<Snapshot>,
refs: Vec<(&str, SnapshotReference)>,
statistics: Vec<StatisticsFile>,
partition_statistics: Vec<PartitionStatisticsFile>,
) -> Table {
let base = make_v2_minimal_table();
let mut builder = base.metadata().clone().into_builder(None);
for snapshot in snapshots {
builder = builder.add_snapshot(snapshot).unwrap();
}
for (name, reference) in refs {
builder = builder.set_ref(name, reference).unwrap();
}
for stats in statistics {
builder = builder.set_statistics(stats);
}
for stats in partition_statistics {
builder = builder.set_partition_statistics(stats);
}
base.with_metadata(Arc::new(builder.build().unwrap().metadata))
}

fn stats_file(snapshot_id: i64) -> StatisticsFile {
StatisticsFile {
snapshot_id,
statistics_path: format!("/stats-{snapshot_id}.puffin"),
file_size_in_bytes: 1,
file_footer_size_in_bytes: 1,
key_metadata: None,
blob_metadata: vec![],
}
}

fn partition_stats_file(snapshot_id: i64) -> PartitionStatisticsFile {
PartitionStatisticsFile {
snapshot_id,
statistics_path: format!("/partition-stats-{snapshot_id}.puffin"),
file_size_in_bytes: 1,
}
}

fn removed_statistics(updates: &[TableUpdate]) -> Vec<i64> {
updates
.iter()
.filter_map(|update| match update {
TableUpdate::RemoveStatistics { snapshot_id } => Some(*snapshot_id),
_ => None,
})
.collect()
}

fn removed_partition_statistics(updates: &[TableUpdate]) -> Vec<i64> {
updates
.iter()
.filter_map(|update| match update {
TableUpdate::RemovePartitionStatistics { snapshot_id } => Some(*snapshot_id),
_ => None,
})
.collect()
}

#[tokio::test]
async fn test_expire_explicit_snapshot_id() {
assert_eq!(
Expand Down Expand Up @@ -995,4 +1079,72 @@ mod tests {
|u| matches!(u, TableUpdate::RemoveSnapshots { snapshot_ids } if snapshot_ids == &[1])
));
}

#[tokio::test]
async fn test_expiring_snapshot_drops_its_statistics() {
// main: 1 -> 2, both carrying statistics. retain_last(1) keeps only the head (2).
let table = table_with_stats(
vec![
snapshot(1, None, 35, TS + 1),
snapshot(2, Some(1), 36, TS + 2),
],
vec![(MAIN_BRANCH, branch(2, None))],
vec![stats_file(1), stats_file(2)],
vec![partition_stats_file(1), partition_stats_file(2)],
);

let updates = updates_of(
&table,
action().retain_last(1).expire_older_than_ms(i64::MAX),
)
.await;

// Snapshot 1 expires, so its stats entries are dropped; the retained head 2 keeps its stats.
assert_eq!(removed_statistics(&updates), vec![1]);
assert_eq!(removed_partition_statistics(&updates), vec![1]);
}

#[tokio::test]
async fn test_expiring_snapshot_without_statistics_emits_no_removal() {
// Same expiry, but no statistics attached to any snapshot.
let table = table_with(
vec![
snapshot(1, None, 35, TS + 1),
snapshot(2, Some(1), 36, TS + 2),
],
vec![(MAIN_BRANCH, branch(2, None))],
);

let updates = updates_of(
&table,
action().retain_last(1).expire_older_than_ms(i64::MAX),
)
.await;

assert!(removed_statistics(&updates).is_empty());
assert!(removed_partition_statistics(&updates).is_empty());
}

#[tokio::test]
async fn test_only_present_statistics_variant_is_removed() {
// Snapshot 1 has statistics but no partition statistics.
let table = table_with_stats(
vec![
snapshot(1, None, 35, TS + 1),
snapshot(2, Some(1), 36, TS + 2),
],
vec![(MAIN_BRANCH, branch(2, None))],
vec![stats_file(1)],
vec![],
);

let updates = updates_of(
&table,
action().retain_last(1).expire_older_than_ms(i64::MAX),
)
.await;

assert_eq!(removed_statistics(&updates), vec![1]);
assert!(removed_partition_statistics(&updates).is_empty());
}
}
Loading