Skip to content

Commit a2575b7

Browse files
Fix sequence number generation (#15)
1 parent 69f6c58 commit a2575b7

File tree

2 files changed

+75
-7
lines changed

2 files changed

+75
-7
lines changed

src/iceberg_destination.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ pub async fn record_batches_to_iceberg(
203203
.collect();
204204

205205
let snapshot_id = fastrand::i64(..);
206-
let sequence_number = 1;
206+
let sequence_number = previous_metadata.last_sequence_number() + 1;
207207

208208
let manifest_file_path = format!("{}/metadata/manifest-{}.avro", target_url, Uuid::new_v4());
209209
let manifest_file_output = file_io.new_output(manifest_file_path)?;

tests/basic_integration.rs

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1+
use std::vec;
2+
13
use arrow::array::{
24
Array, BinaryArray, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array,
35
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, TimestampMicrosecondArray,
46
};
57
use arrow::datatypes::DataType;
68
use clap::Parser;
79
use futures::{StreamExt, TryStreamExt};
10+
use iceberg::spec::TableMetadata;
811
use lakehouse_loader::delta_destination::object_store_keys_from_env;
912
use lakehouse_loader::error::DataLoadingError;
1013
use lakehouse_loader::pg_arrow_source::PgArrowSource;
@@ -57,6 +60,10 @@ async fn test_pg_to_delta_e2e() {
5760
assert!(paths[2].to_string().ends_with("-c000.snappy.parquet"));
5861
}
5962

63+
const DATA_FILEPATH_PATTERN: &str = r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$";
64+
const MANIFEST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$";
65+
const MANIFEST_LIST_FILEPATH_PATTERN: &str = r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$";
66+
6067
#[tokio::test]
6168
async fn test_pg_to_iceberg() {
6269
let target_url = "s3://lhl-test-bucket/iceberg";
@@ -78,23 +85,40 @@ async fn test_pg_to_iceberg() {
7885
let (store, path) =
7986
object_store::parse_url_opts(&Url::parse(target_url).unwrap(), config).unwrap();
8087

88+
// THEN iceberg data and metadata files are written
8189
let mut paths = store
8290
.list(Some(&path))
8391
.map_ok(|m| m.location)
8492
.boxed()
8593
.try_collect::<Vec<Path>>()
8694
.await
8795
.unwrap();
88-
8996
paths.sort();
90-
91-
// THEN iceberg data and metadata files are written
9297
assert_eq!(paths.len(), 5);
93-
assert!(Regex::new(r"^iceberg/data/part-00000-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.parquet$").unwrap().is_match(paths[0].as_ref()));
94-
assert!(Regex::new(r"^iceberg/metadata/manifest-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[1].as_ref()));
95-
assert!(Regex::new(r"^iceberg/metadata/manifest-list-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}.avro$").unwrap().is_match(paths[2].as_ref()));
98+
assert!(Regex::new(DATA_FILEPATH_PATTERN)
99+
.unwrap()
100+
.is_match(paths[0].as_ref()));
101+
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
102+
.unwrap()
103+
.is_match(paths[1].as_ref()));
104+
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
105+
.unwrap()
106+
.is_match(paths[2].as_ref()));
96107
assert_eq!(&paths[3].to_string(), "iceberg/metadata/v0.metadata.json");
97108
assert_eq!(&paths[4].to_string(), "iceberg/metadata/version-hint.text");
109+
// THEN iceberg metadata can be parsed
110+
let metadata_bytes = store.get(&paths[3]).await.unwrap().bytes().await.unwrap();
111+
let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap();
112+
let metadata = serde_json::from_str::<TableMetadata>(metadata_str).unwrap();
113+
// THEN metadata contains a single snapshot with sequence number 1
114+
assert_eq!(metadata.last_sequence_number(), 1);
115+
assert_eq!(
116+
metadata
117+
.snapshots()
118+
.map(|s| s.sequence_number())
119+
.collect::<Vec<_>>(),
120+
vec![1]
121+
);
98122

99123
// WHEN we try to write to an existing table without passing the overwrite flag
100124
// THEN the command errors out
@@ -149,6 +173,50 @@ async fn test_pg_to_iceberg() {
149173
"--overwrite",
150174
];
151175
assert!(do_main(Cli::parse_from(args.clone())).await.is_ok());
176+
177+
// THEN iceberg data and metadata files are written
178+
let mut paths = store
179+
.list(Some(&path))
180+
.map_ok(|m| m.location)
181+
.boxed()
182+
.try_collect::<Vec<Path>>()
183+
.await
184+
.unwrap();
185+
paths.sort();
186+
assert_eq!(paths.len(), 9);
187+
assert!(Regex::new(DATA_FILEPATH_PATTERN)
188+
.unwrap()
189+
.is_match(paths[0].as_ref()));
190+
assert!(Regex::new(DATA_FILEPATH_PATTERN)
191+
.unwrap()
192+
.is_match(paths[1].as_ref()));
193+
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
194+
.unwrap()
195+
.is_match(paths[2].as_ref()));
196+
assert!(Regex::new(MANIFEST_FILEPATH_PATTERN)
197+
.unwrap()
198+
.is_match(paths[3].as_ref()));
199+
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
200+
.unwrap()
201+
.is_match(paths[4].as_ref()));
202+
assert!(Regex::new(MANIFEST_LIST_FILEPATH_PATTERN)
203+
.unwrap()
204+
.is_match(paths[5].as_ref()));
205+
assert_eq!(&paths[6].to_string(), "iceberg/metadata/v0.metadata.json");
206+
assert_eq!(&paths[7].to_string(), "iceberg/metadata/v1.metadata.json");
207+
assert_eq!(&paths[8].to_string(), "iceberg/metadata/version-hint.text");
208+
// THEN iceberg metadata can be parsed
209+
let metadata_bytes = store.get(&paths[7]).await.unwrap().bytes().await.unwrap();
210+
let metadata_str = core::str::from_utf8(&metadata_bytes).unwrap();
211+
let metadata = serde_json::from_str::<TableMetadata>(metadata_str).unwrap();
212+
// THEN metadata contains two snapshots with sequence numbers 1 and 2
213+
assert_eq!(metadata.last_sequence_number(), 2);
214+
let mut snapshot_ids = metadata
215+
.snapshots()
216+
.map(|s| s.sequence_number())
217+
.collect::<Vec<_>>();
218+
snapshot_ids.sort();
219+
assert_eq!(snapshot_ids, vec![1, 2]);
152220
}
153221

154222
#[tokio::test]

0 commit comments

Comments
 (0)