diff --git a/crates/iceberg/src/transaction/expire_snapshots.rs b/crates/iceberg/src/transaction/expire_snapshots.rs index e9fbc9f151..2e097d6ad9 100644 --- a/crates/iceberg/src/transaction/expire_snapshots.rs +++ b/crates/iceberg/src/transaction/expire_snapshots.rs @@ -321,6 +321,26 @@ impl TransactionAction for ExpireSnapshotsAction { .into_iter() .map(|ref_name| TableUpdate::RemoveSnapshotRef { ref_name }) .collect(); + + // Drop the statistics metadata tied to each expired snapshot, mirroring Java + // `RemoveSnapshots.removeSnapshots`. This only rewrites metadata; the puffin files those + // entries point at are deleted by the higher-level file-cleanup maintenance operation. + for snapshot_id in &plan.ids_to_remove { + if metadata.statistics_for_snapshot(*snapshot_id).is_some() { + updates.push(TableUpdate::RemoveStatistics { + snapshot_id: *snapshot_id, + }); + } + if metadata + .partition_statistics_for_snapshot(*snapshot_id) + .is_some() + { + updates.push(TableUpdate::RemovePartitionStatistics { + snapshot_id: *snapshot_id, + }); + } + } + if !plan.ids_to_remove.is_empty() { updates.push(TableUpdate::RemoveSnapshots { snapshot_ids: plan.ids_to_remove, @@ -349,7 +369,8 @@ mod tests { use chrono::Utc; use crate::spec::{ - MAIN_BRANCH, Operation, Snapshot, SnapshotReference, SnapshotRetention, Summary, + MAIN_BRANCH, Operation, PartitionStatisticsFile, Snapshot, SnapshotReference, + SnapshotRetention, StatisticsFile, Summary, }; use crate::table::Table; use crate::transaction::Transaction; @@ -472,6 +493,69 @@ mod tests { base.with_metadata(Arc::new(builder.build().unwrap().metadata)) } + /// Like [`table_with`], but also attaches statistics and partition-statistics files. + fn table_with_stats( + snapshots: Vec, + refs: Vec<(&str, SnapshotReference)>, + statistics: Vec, + partition_statistics: Vec, + ) -> Table { + let base = make_v2_minimal_table(); + let mut builder = base.metadata().clone().into_builder(None); + for snapshot in snapshots { + builder = builder.add_snapshot(snapshot).unwrap(); + } + for (name, reference) in refs { + builder = builder.set_ref(name, reference).unwrap(); + } + for stats in statistics { + builder = builder.set_statistics(stats); + } + for stats in partition_statistics { + builder = builder.set_partition_statistics(stats); + } + base.with_metadata(Arc::new(builder.build().unwrap().metadata)) + } + + fn stats_file(snapshot_id: i64) -> StatisticsFile { + StatisticsFile { + snapshot_id, + statistics_path: format!("/stats-{snapshot_id}.puffin"), + file_size_in_bytes: 1, + file_footer_size_in_bytes: 1, + key_metadata: None, + blob_metadata: vec![], + } + } + + fn partition_stats_file(snapshot_id: i64) -> PartitionStatisticsFile { + PartitionStatisticsFile { + snapshot_id, + statistics_path: format!("/partition-stats-{snapshot_id}.puffin"), + file_size_in_bytes: 1, + } + } + + fn removed_statistics(updates: &[TableUpdate]) -> Vec { + updates + .iter() + .filter_map(|update| match update { + TableUpdate::RemoveStatistics { snapshot_id } => Some(*snapshot_id), + _ => None, + }) + .collect() + } + + fn removed_partition_statistics(updates: &[TableUpdate]) -> Vec { + updates + .iter() + .filter_map(|update| match update { + TableUpdate::RemovePartitionStatistics { snapshot_id } => Some(*snapshot_id), + _ => None, + }) + .collect() + } + #[tokio::test] async fn test_expire_explicit_snapshot_id() { assert_eq!( @@ -995,4 +1079,72 @@ mod tests { |u| matches!(u, TableUpdate::RemoveSnapshots { snapshot_ids } if snapshot_ids == &[1]) )); } + + #[tokio::test] + async fn test_expiring_snapshot_drops_its_statistics() { + // main: 1 -> 2, both carrying statistics. retain_last(1) keeps only the head (2). + let table = table_with_stats( + vec![ + snapshot(1, None, 35, TS + 1), + snapshot(2, Some(1), 36, TS + 2), + ], + vec![(MAIN_BRANCH, branch(2, None))], + vec![stats_file(1), stats_file(2)], + vec![partition_stats_file(1), partition_stats_file(2)], + ); + + let updates = updates_of( + &table, + action().retain_last(1).expire_older_than_ms(i64::MAX), + ) + .await; + + // Snapshot 1 expires, so its stats entries are dropped; the retained head 2 keeps its stats. + assert_eq!(removed_statistics(&updates), vec![1]); + assert_eq!(removed_partition_statistics(&updates), vec![1]); + } + + #[tokio::test] + async fn test_expiring_snapshot_without_statistics_emits_no_removal() { + // Same expiry, but no statistics attached to any snapshot. + let table = table_with( + vec![ + snapshot(1, None, 35, TS + 1), + snapshot(2, Some(1), 36, TS + 2), + ], + vec![(MAIN_BRANCH, branch(2, None))], + ); + + let updates = updates_of( + &table, + action().retain_last(1).expire_older_than_ms(i64::MAX), + ) + .await; + + assert!(removed_statistics(&updates).is_empty()); + assert!(removed_partition_statistics(&updates).is_empty()); + } + + #[tokio::test] + async fn test_only_present_statistics_variant_is_removed() { + // Snapshot 1 has statistics but no partition statistics. + let table = table_with_stats( + vec![ + snapshot(1, None, 35, TS + 1), + snapshot(2, Some(1), 36, TS + 2), + ], + vec![(MAIN_BRANCH, branch(2, None))], + vec![stats_file(1)], + vec![], + ); + + let updates = updates_of( + &table, + action().retain_last(1).expire_older_than_ms(i64::MAX), + ) + .await; + + assert_eq!(removed_statistics(&updates), vec![1]); + assert!(removed_partition_statistics(&updates).is_empty()); + } }