@@ -171,13 +171,18 @@ pub trait EventFormat: Sized {
171
171
) ) ,
172
172
) ;
173
173
174
- // prepare the record batch and new fields to be added
175
- let mut new_schema = Arc :: new ( Schema :: new ( schema) ) ;
176
- if !is_schema_matching ( new_schema. clone ( ) , storage_schema, static_schema_flag) {
174
+ if !is_schema_matching ( & schema, storage_schema, static_schema_flag) {
177
175
return Err ( anyhow ! ( "Schema mismatch" ) ) ;
178
176
}
179
- new_schema =
180
- update_field_type_in_schema ( new_schema, None , time_partition, None , schema_version) ;
177
+
178
+ // prepare the record batch and new fields to be added
179
+ let new_schema = update_field_type_in_schema (
180
+ Arc :: new ( Schema :: new ( schema) ) ,
181
+ None ,
182
+ time_partition,
183
+ None ,
184
+ schema_version,
185
+ ) ;
181
186
182
187
let mut rb = Self :: decode ( data, new_schema. clone ( ) ) ?;
183
188
rb = replace_columns (
@@ -204,26 +209,22 @@ pub trait EventFormat: Sized {
204
209
) -> Result < Event , AnyError > ;
205
210
}
206
211
212
+ /// Determines if a schema matches the storage schema based on configuration.
213
+ /// Returns `true` if the schemas match according to the rules:
214
+ /// - If `static_schema_flag` is `false`, always returns `true` (flexible schema mode)
215
+ /// - If `static_schema_flag` is `true`, returns `true` only if all fields in `schema`
216
+ /// exist in `storage_schema` with exactly matching properties
207
217
fn is_schema_matching (
208
- new_schema : Arc < Schema > ,
218
+ schema : & [ Arc < Field > ] ,
209
219
storage_schema : & HashMap < String , Arc < Field > > ,
210
220
static_schema_flag : bool ,
211
221
) -> bool {
212
- if !static_schema_flag {
213
- return true ;
214
- }
215
- for field in new_schema. fields ( ) {
216
- let Some ( storage_field) = storage_schema. get ( field. name ( ) ) else {
217
- return false ;
218
- } ;
219
- if field. name ( ) != storage_field. name ( ) {
220
- return false ;
221
- }
222
- if field. data_type ( ) != storage_field. data_type ( ) {
223
- return false ;
224
- }
225
- }
226
- true
222
+ !static_schema_flag
223
+ || !schema. iter ( ) . any ( |field| {
224
+ storage_schema
225
+ . get ( field. name ( ) )
226
+ . is_none_or ( |storage_field| storage_field != field)
227
+ } )
227
228
}
228
229
229
230
pub fn get_existing_field_names (
@@ -374,7 +375,7 @@ pub fn override_data_type(
374
375
mod tests {
375
376
use std:: { collections:: HashMap , sync:: Arc } ;
376
377
377
- use arrow_schema:: { DataType , Field , Schema } ;
378
+ use arrow_schema:: { DataType , Field } ;
378
379
379
380
use super :: * ;
380
381
@@ -383,13 +384,8 @@ mod tests {
383
384
Arc :: new ( Field :: new ( name. to_string ( ) , data_type, true ) )
384
385
}
385
386
386
- // Helper function to create a test schema
387
- fn create_schema ( fields : Vec < Arc < Field > > ) -> Arc < Schema > {
388
- Arc :: new ( Schema :: new ( fields) )
389
- }
390
-
391
387
// Helper function to create a storage schema HashMap
392
- fn create_storage_schema ( fields : Vec < Arc < Field > > ) -> HashMap < String , Arc < Field > > {
388
+ fn create_storage_schema ( fields : & [ Arc < Field > ] ) -> HashMap < String , Arc < Field > > {
393
389
let mut storage_schema = HashMap :: new ( ) ;
394
390
for field in fields {
395
391
storage_schema. insert ( field. name ( ) . to_string ( ) , field. clone ( ) ) ;
@@ -403,11 +399,11 @@ mod tests {
403
399
let field1 = create_field ( "id" , DataType :: Int32 ) ;
404
400
let field2 = create_field ( "name" , DataType :: Utf8 ) ;
405
401
406
- let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
407
- let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) ] ) ; // Missing field2
402
+ let schema = [ field1. clone ( ) , field2. clone ( ) ] ;
403
+ let storage_schema = create_storage_schema ( & [ field1. clone ( ) ] ) ; // Missing field2
408
404
409
405
// Even though schemas don't match, function should return true because static_schema_flag is false
410
- assert ! ( is_schema_matching( schema, & storage_schema, false ) ) ;
406
+ assert ! ( is_schema_matching( & schema, & storage_schema, false ) ) ;
411
407
}
412
408
413
409
#[ test]
@@ -416,10 +412,10 @@ mod tests {
416
412
let field1 = create_field ( "id" , DataType :: Int32 ) ;
417
413
let field2 = create_field ( "name" , DataType :: Utf8 ) ;
418
414
419
- let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
420
- let storage_schema = create_storage_schema ( vec ! [ field1 . clone ( ) , field2 . clone ( ) ] ) ;
415
+ let schema = [ field1. clone ( ) , field2. clone ( ) ] ;
416
+ let storage_schema = create_storage_schema ( & schema ) ;
421
417
422
- assert ! ( is_schema_matching( schema, & storage_schema, true ) ) ;
418
+ assert ! ( is_schema_matching( & schema, & storage_schema, true ) ) ;
423
419
}
424
420
425
421
#[ test]
@@ -428,10 +424,10 @@ mod tests {
428
424
let field1 = create_field ( "id" , DataType :: Int32 ) ;
429
425
let field2 = create_field ( "name" , DataType :: Utf8 ) ;
430
426
431
- let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
432
- let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) ] ) ; // Missing field2
427
+ let schema = [ field1. clone ( ) , field2. clone ( ) ] ;
428
+ let storage_schema = create_storage_schema ( & [ field1. clone ( ) ] ) ; // Missing field2
433
429
434
- assert ! ( !is_schema_matching( schema, & storage_schema, true ) ) ;
430
+ assert ! ( !is_schema_matching( & schema, & storage_schema, true ) ) ;
435
431
}
436
432
437
433
#[ test]
@@ -442,10 +438,10 @@ mod tests {
442
438
let field1_different_type = create_field ( "id" , DataType :: Int64 ) ;
443
439
let field2 = create_field ( "name" , DataType :: Utf8 ) ;
444
440
445
- let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
446
- let storage_schema = create_storage_schema ( vec ! [ field1_different_type, field2. clone( ) ] ) ;
441
+ let schema = [ field1. clone ( ) , field2. clone ( ) ] ;
442
+ let storage_schema = create_storage_schema ( & [ field1_different_type, field2. clone ( ) ] ) ;
447
443
448
- assert ! ( !is_schema_matching( schema, & storage_schema, true ) ) ;
444
+ assert ! ( !is_schema_matching( & schema, & storage_schema, true ) ) ;
449
445
}
450
446
451
447
#[ test]
@@ -456,53 +452,38 @@ mod tests {
456
452
let field2 = create_field ( "name" , DataType :: Utf8 ) ;
457
453
let extra_field = create_field ( "extra" , DataType :: Boolean ) ;
458
454
459
- let schema = create_schema ( vec ! [ field1. clone( ) , field2. clone( ) ] ) ;
460
- let storage_schema =
461
- create_storage_schema ( vec ! [ field1. clone( ) , field2. clone( ) , extra_field] ) ;
455
+ let schema = [ field1. clone ( ) , field2. clone ( ) ] ;
456
+ let storage_schema = create_storage_schema ( & [ field1. clone ( ) , field2. clone ( ) , extra_field] ) ;
462
457
463
- assert ! ( is_schema_matching( schema, & storage_schema, true ) ) ;
458
+ assert ! ( is_schema_matching( & schema, & storage_schema, true ) ) ;
464
459
}
465
460
466
461
#[ test]
467
462
fn test_empty_new_schema ( ) {
468
463
// When new schema is empty, should return true
469
464
let field1 = create_field ( "id" , DataType :: Int32 ) ;
470
465
471
- let empty_schema = create_schema ( vec ! [ ] ) ;
472
- let storage_schema = create_storage_schema ( vec ! [ field1. clone( ) ] ) ;
466
+ let storage_schema = create_storage_schema ( & [ field1. clone ( ) ] ) ;
473
467
474
- assert ! ( is_schema_matching(
475
- empty_schema,
476
- & storage_schema,
477
- true
478
- ) ) ;
468
+ assert ! ( is_schema_matching( & [ ] , & storage_schema, true ) ) ;
479
469
}
480
470
481
471
#[ test]
482
472
fn test_empty_storage_schema ( ) {
483
473
// When storage schema is empty but new schema has fields, should return false
484
474
let field1 = create_field ( "id" , DataType :: Int32 ) ;
485
475
486
- let schema = create_schema ( vec ! [ field1. clone( ) ] ) ;
487
- let empty_storage_schema: HashMap < String , Arc < Field > > = HashMap :: new ( ) ;
476
+ let schema = [ field1. clone ( ) ] ;
477
+ let empty_storage_schema = HashMap :: new ( ) ;
488
478
489
- assert ! ( !is_schema_matching(
490
- schema,
491
- & empty_storage_schema,
492
- true
493
- ) ) ;
479
+ assert ! ( !is_schema_matching( & schema, & empty_storage_schema, true ) ) ;
494
480
}
495
481
496
482
#[ test]
497
483
fn test_both_empty_schemas ( ) {
498
484
// When both schemas are empty, should return true
499
- let empty_schema = create_schema ( vec ! [ ] ) ;
500
- let empty_storage_schema: HashMap < String , Arc < Field > > = HashMap :: new ( ) ;
501
-
502
- assert ! ( is_schema_matching(
503
- empty_schema,
504
- & empty_storage_schema,
505
- true
506
- ) ) ;
485
+ let empty_storage_schema = HashMap :: new ( ) ;
486
+
487
+ assert ! ( is_schema_matching( & [ ] , & empty_storage_schema, true ) ) ;
507
488
}
508
489
}
0 commit comments