@@ -173,7 +173,7 @@ pub trait EventFormat: Sized {
173
173
174
174
// prepare the record batch and new fields to be added
175
175
let mut new_schema = Arc :: new ( Schema :: new ( schema) ) ;
176
- if !Self :: is_schema_matching ( new_schema. clone ( ) , storage_schema, static_schema_flag) {
176
+ if !is_schema_matching ( new_schema. clone ( ) , storage_schema, static_schema_flag) {
177
177
return Err ( anyhow ! ( "Schema mismatch" ) ) ;
178
178
}
179
179
new_schema =
@@ -190,28 +190,6 @@ pub trait EventFormat: Sized {
190
190
Ok ( ( rb, is_first) )
191
191
}
192
192
193
- fn is_schema_matching (
194
- new_schema : Arc < Schema > ,
195
- storage_schema : & HashMap < String , Arc < Field > > ,
196
- static_schema_flag : bool ,
197
- ) -> bool {
198
- if !static_schema_flag {
199
- return true ;
200
- }
201
- for field in new_schema. fields ( ) {
202
- let Some ( storage_field) = storage_schema. get ( field. name ( ) ) else {
203
- return false ;
204
- } ;
205
- if field. name ( ) != storage_field. name ( ) {
206
- return false ;
207
- }
208
- if field. data_type ( ) != storage_field. data_type ( ) {
209
- return false ;
210
- }
211
- }
212
- true
213
- }
214
-
215
193
#[ allow( clippy:: too_many_arguments) ]
216
194
fn into_event (
217
195
self ,
@@ -226,6 +204,28 @@ pub trait EventFormat: Sized {
226
204
) -> Result < Event , AnyError > ;
227
205
}
228
206
207
+ fn is_schema_matching (
208
+ new_schema : Arc < Schema > ,
209
+ storage_schema : & HashMap < String , Arc < Field > > ,
210
+ static_schema_flag : bool ,
211
+ ) -> bool {
212
+ if !static_schema_flag {
213
+ return true ;
214
+ }
215
+ for field in new_schema. fields ( ) {
216
+ let Some ( storage_field) = storage_schema. get ( field. name ( ) ) else {
217
+ return false ;
218
+ } ;
219
+ if field. name ( ) != storage_field. name ( ) {
220
+ return false ;
221
+ }
222
+ if field. data_type ( ) != storage_field. data_type ( ) {
223
+ return false ;
224
+ }
225
+ }
226
+ true
227
+ }
228
+
229
229
pub fn get_existing_field_names (
230
230
inferred_schema : Arc < Schema > ,
231
231
existing_schema : Option < & HashMap < String , Arc < Field > > > ,
@@ -369,3 +369,140 @@ pub fn override_data_type(
369
369
370
370
Arc :: new ( Schema :: new ( updated_schema) )
371
371
}
372
+
373
+ #[ cfg( test) ]
374
+ mod tests {
375
+ use std:: { collections:: HashMap , sync:: Arc } ;
376
+
377
+ use arrow_schema:: { DataType , Field , Schema } ;
378
+
379
+ use super :: * ;
380
+
381
+ // Helper function to create a test field
382
+ fn create_field ( name : & str , data_type : DataType ) -> Arc < Field > {
383
+ Arc :: new ( Field :: new ( name. to_string ( ) , data_type, true ) )
384
+ }
385
+
386
+ // Helper function to create a test schema
387
+ fn create_schema ( fields : Vec < Arc < Field > > ) -> Arc < Schema > {
388
+ Arc :: new ( Schema :: new ( fields) )
389
+ }
390
+
391
+ // Helper function to create a storage schema HashMap
392
+ fn create_storage_schema ( fields : Vec < Arc < Field > > ) -> HashMap < String , Arc < Field > > {
393
+ let mut storage_schema = HashMap :: new ( ) ;
394
+ for field in fields {
395
+ storage_schema. insert ( field. name ( ) . to_string ( ) , field. clone ( ) ) ;
396
+ }
397
+ storage_schema
398
+ }
399
+
400
+ #[ test]
401
+ fn test_static_schema_flag_false ( ) {
402
+ // When static_schema_flag is false, should always return true regardless of schemas
403
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
404
+ let field2 = create_field ( "name" , DataType :: Utf8 ) ;
405
+
406
+ let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
407
+ let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) ] ) ; // Missing field2
408
+
409
+ // Even though schemas don't match, function should return true because static_schema_flag is false
410
+ assert ! ( is_schema_matching( schema, & storage_schema, false ) ) ;
411
+ }
412
+
413
+ #[ test]
414
+ fn test_identical_schemas ( ) {
415
+ // When schemas are identical, should return true
416
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
417
+ let field2 = create_field ( "name" , DataType :: Utf8 ) ;
418
+
419
+ let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
420
+ let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
421
+
422
+ assert ! ( is_schema_matching( schema, & storage_schema, true ) ) ;
423
+ }
424
+
425
+ #[ test]
426
+ fn test_missing_field_in_storage ( ) {
427
+ // When storage schema is missing a field from new schema, should return false
428
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
429
+ let field2 = create_field ( "name" , DataType :: Utf8 ) ;
430
+
431
+ let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
432
+ let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) ] ) ; // Missing field2
433
+
434
+ assert ! ( !is_schema_matching( schema, & storage_schema, true ) ) ;
435
+ }
436
+
437
+ #[ test]
438
+ fn test_different_data_type ( ) {
439
+ // When field has different data type, should return false
440
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
441
+ // Same name but different type
442
+ let field1_different_type = create_field ( "id" , DataType :: Int64 ) ;
443
+ let field2 = create_field ( "name" , DataType :: Utf8 ) ;
444
+
445
+ let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
446
+ let storage_schema = create_storage_schema ( vec ! [ field1_different_type, field2. clone( ) ] ) ;
447
+
448
+ assert ! ( !is_schema_matching( schema, & storage_schema, true ) ) ;
449
+ }
450
+
451
+ #[ test]
452
+ fn test_extra_fields_in_storage ( ) {
453
+ // When storage schema has extra fields not in new schema, should still return true
454
+ // This is because we only check if fields in new_schema exist in storage_schema
455
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
456
+ let field2 = create_field ( "name" , DataType :: Utf8 ) ;
457
+ let extra_field = create_field ( "extra" , DataType :: Boolean ) ;
458
+
459
+ let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
460
+ let storage_schema =
461
+ create_storage_schema ( vec ! [ field1. clone( ) , field2. clone( ) , extra_field] ) ;
462
+
463
+ assert ! ( is_schema_matching( schema, & storage_schema, true ) ) ;
464
+ }
465
+
466
+ #[ test]
467
+ fn test_empty_new_schema ( ) {
468
+ // When new schema is empty, should return true
469
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
470
+
471
+ let empty_schema = create_schema ( vec ! [ ] ) ;
472
+ let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) ] ) ;
473
+
474
+ assert ! ( is_schema_matching(
475
+ empty_schema,
476
+ & storage_schema,
477
+ true
478
+ ) ) ;
479
+ }
480
+
481
+ #[ test]
482
+ fn test_empty_storage_schema ( ) {
483
+ // When storage schema is empty but new schema has fields, should return false
484
+ let field1 = create_field ( "id" , DataType :: Int32 ) ;
485
+
486
+ let schema = create_schema ( vec ! [ field1. clone( ) ] ) ;
487
+ let empty_storage_schema: HashMap < String , Arc < Field > > = HashMap :: new ( ) ;
488
+
489
+ assert ! ( !is_schema_matching(
490
+ schema,
491
+ & empty_storage_schema,
492
+ true
493
+ ) ) ;
494
+ }
495
+
496
+ #[ test]
497
+ fn test_both_empty_schemas ( ) {
498
+ // When both schemas are empty, should return true
499
+ let empty_schema = create_schema ( vec ! [ ] ) ;
500
+ let empty_storage_schema: HashMap < String , Arc < Field > > = HashMap :: new ( ) ;
501
+
502
+ assert ! ( is_schema_matching(
503
+ empty_schema,
504
+ & empty_storage_schema,
505
+ true
506
+ ) ) ;
507
+ }
508
+ }
0 commit comments