Skip to content

Commit 4c537e4

Browse files
authored
Add schema mismatch check (#247)
1 parent e42642c commit 4c537e4

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

server/src/event.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl Event {
179179
let schema_ref = Arc::new(schema);
180180
// validate schema before processing the event
181181
let Ok(mut event) = self.get_record(schema_ref.clone()) else {
182-
return Err(EventError::SchemaMismatch(self.stream_name.clone()));
182+
return Err(EventError::SchemaMismatch);
183183
};
184184

185185
if event
@@ -312,12 +312,39 @@ impl Event {
312312

313313
fn get_record(&self, schema: Arc<Schema>) -> Result<RecordBatch, EventError> {
314314
let mut iter = std::iter::once(Ok(self.body.clone()));
315+
if fields_mismatch(&schema, &self.body) {
316+
return Err(EventError::SchemaMismatch);
317+
}
315318
let record = Decoder::new(schema, DecoderOptions::new()).next_batch(&mut iter)?;
316319

317320
record.ok_or(EventError::MissingRecord)
318321
}
319322
}
320323

324+
fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
325+
for (name, val) in body.as_object().expect("body is of object variant") {
326+
let Ok(field) = schema.field_with_name(name) else { return true };
327+
328+
// datatype check only some basic cases
329+
let valid_datatype = match field.data_type() {
330+
DataType::Boolean => val.is_boolean(),
331+
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => val.is_i64(),
332+
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
333+
val.is_u64()
334+
}
335+
DataType::Float16 | DataType::Float32 | DataType::Float64 => val.is_f64(),
336+
DataType::Utf8 => val.is_string(),
337+
_ => false,
338+
};
339+
340+
if !valid_datatype {
341+
return true;
342+
}
343+
}
344+
345+
false
346+
}
347+
321348
fn replace(
322349
schema: Arc<Schema>,
323350
batch: RecordBatch,
@@ -390,8 +417,8 @@ pub mod error {
390417
Metadata(#[from] MetadataError),
391418
#[error("Stream Writer Failed: {0}")]
392419
Arrow(#[from] ArrowError),
393-
#[error("Schema Mismatch: {0}")]
394-
SchemaMismatch(String),
420+
#[error("Schema Mismatch")]
421+
SchemaMismatch,
395422
#[error("Schema Mismatch: {0}")]
396423
ObjectStorage(#[from] ObjectStorageError),
397424
}

server/src/utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use serde_json::{json, Value};
2121

2222
pub fn flatten_json_body(body: &serde_json::Value) -> Result<Value, serde_json::Error> {
2323
let mut flat_value: Value = json!({});
24-
flatten_json::flatten(body, &mut flat_value, None, true, Some("_")).unwrap();
24+
flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap();
2525
Ok(flat_value)
2626
}
2727

0 commit comments

Comments
 (0)