Skip to content

Commit 6bd54be

Browse files
trueleonitisht
andauthored
Add custom flattening that ignores array (#344)
Custom flattening approach to flatten json arrays containing objects. Currently a json array like ``` { "name": "John", "age": 30, "cars": [{ "brand": "Ford" }, { "brand": "BMW", "model": "x1" }, { "brand": "Audi", "model": "q1" } ] } ``` would be flattened to `cars.0_brand`,`cars.0_model`, `cars.1_brand`, `cars.1_model`, `cars.2_brand`, `cars.2_model`. This poses an issue for very long arrays. Since a single event with a long array will add several columns to the schema. But most of the times these columns would be empty. This PR changes the approach to flatten to two fields like `cars_brand`, `cars_model`. Each column will have an array of elements in that field. Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 8aaf7c1 commit 6bd54be

File tree

7 files changed

+373
-56
lines changed

7 files changed

+373
-56
lines changed

Cargo.lock

Lines changed: 0 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ relative-path = { version = "1.7", features = ["serde"] }
5151
reqwest = { version = "0.11", default_features=false, features=["rustls", "json", "hyper-rustls", "tokio-rustls"]}
5252
rustls = "0.20"
5353
rustls-pemfile = "1.0"
54-
rust-flatten-json = "0.2"
5554
semver = "1.0"
5655
serde = { version = "1.0", features = ["rc"] }
5756
serde_json = "1.0"

server/src/alerts/mod.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ pub mod rule;
2626
pub mod target;
2727

2828
use crate::metrics::ALERTS_STATES;
29-
use crate::storage;
3029
use crate::utils::uid;
3130
use crate::CONFIG;
31+
use crate::{storage, utils};
3232

3333
pub use self::rule::Rule;
3434
use self::target::Target;
@@ -97,16 +97,9 @@ impl Alert {
9797
let deployment_mode = storage::StorageMetadata::global().mode.to_string();
9898
let additional_labels =
9999
serde_json::to_value(rule).expect("rule is perfectly deserializable");
100-
let mut flatten_additional_labels = serde_json::json!({});
101-
flatten_json::flatten(
102-
&additional_labels,
103-
&mut flatten_additional_labels,
104-
Some("rule".to_string()),
105-
false,
106-
Some("_"),
107-
)
108-
.expect("can be flattened");
109-
100+
let flatten_additional_labels =
101+
utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_")
102+
.expect("can be flattened");
110103
Context::new(
111104
stream_name,
112105
AlertInfo::new(

server/src/event.rs

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ impl Event {
100100
let stream_name = &self.stream_name;
101101
let schema_key = &self.schema_key;
102102

103+
let old = metadata::STREAM_INFO.merged_schema(stream_name)?;
104+
if Schema::try_merge(vec![old, schema.clone()]).is_err() {
105+
return Err(EventError::SchemaMismatch);
106+
};
107+
103108
commit_schema(stream_name, schema_key, Arc::new(schema))?;
104109
self.process_event(event)
105110
}
@@ -156,27 +161,55 @@ pub fn get_schema_key(body: &Value) -> String {
156161
fn fields_mismatch(schema: &Schema, body: &Value) -> bool {
157162
for (name, val) in body.as_object().expect("body is of object variant") {
158163
let Ok(field) = schema.field_with_name(name) else { return true };
159-
160-
// datatype check only some basic cases
161-
let valid_datatype = match field.data_type() {
162-
DataType::Boolean => val.is_boolean(),
163-
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => val.is_i64(),
164-
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => {
165-
val.is_u64()
166-
}
167-
DataType::Float16 | DataType::Float32 | DataType::Float64 => val.is_f64(),
168-
DataType::Utf8 => val.is_string(),
169-
_ => false,
170-
};
171-
172-
if !valid_datatype {
164+
if !valid_type(field.data_type(), val) {
173165
return true;
174166
}
175167
}
176-
177168
false
178169
}
179170

171+
fn valid_type(data_type: &DataType, value: &Value) -> bool {
172+
match data_type {
173+
DataType::Boolean => value.is_boolean(),
174+
DataType::Int8 | DataType::Int16 | DataType::Int32 | DataType::Int64 => value.is_i64(),
175+
DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => value.is_u64(),
176+
DataType::Float16 | DataType::Float32 | DataType::Float64 => value.is_f64(),
177+
DataType::Utf8 => value.is_string(),
178+
DataType::List(field) => {
179+
let data_type = field.data_type();
180+
if let Value::Array(arr) = value {
181+
for elem in arr {
182+
if !valid_type(data_type, elem) {
183+
return false;
184+
}
185+
}
186+
}
187+
true
188+
}
189+
DataType::Struct(fields) => {
190+
if let Value::Object(val) = value {
191+
for (key, value) in val {
192+
let field = (0..fields.len())
193+
.find(|idx| fields[*idx].name() == key)
194+
.map(|idx| &fields[idx]);
195+
196+
if let Some(field) = field {
197+
if !valid_type(field.data_type(), value) {
198+
return false;
199+
}
200+
} else {
201+
return false;
202+
}
203+
}
204+
true
205+
} else {
206+
false
207+
}
208+
}
209+
_ => unreachable!(),
210+
}
211+
}
212+
180213
fn commit_schema(
181214
stream_name: &str,
182215
schema_key: &str,

server/src/handlers/http/ingest.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ async fn push_logs(
7979
Value::Array(array) => {
8080
for mut body in array {
8181
merge(&mut body, tags_n_metadata.clone().into_iter());
82-
let body = flatten_json_body(&body).unwrap();
82+
let body = flatten_json_body(body).map_err(|_| PostError::FlattenError)?;
8383
let schema_key = event::get_schema_key(&body);
8484

8585
let event = event::Event {
@@ -93,7 +93,7 @@ async fn push_logs(
9393
}
9494
mut body @ Value::Object(_) => {
9595
merge(&mut body, tags_n_metadata.into_iter());
96-
let body = flatten_json_body(&body).unwrap();
96+
let body = flatten_json_body(body).map_err(|_| PostError::FlattenError)?;
9797
let schema_key = event::get_schema_key(&body);
9898
let event = event::Event {
9999
body,
@@ -117,6 +117,8 @@ pub enum PostError {
117117
Event(#[from] EventError),
118118
#[error("Invalid Request")]
119119
Invalid,
120+
#[error("failed to flatten the json object")]
121+
FlattenError,
120122
#[error("Failed to create stream due to {0}")]
121123
CreateStream(Box<dyn std::error::Error + Send + Sync>),
122124
}
@@ -128,6 +130,7 @@ impl actix_web::ResponseError for PostError {
128130
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
129131
PostError::Invalid => StatusCode::BAD_REQUEST,
130132
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
133+
PostError::FlattenError => StatusCode::BAD_REQUEST,
131134
}
132135
}
133136

server/src/utils/json.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
*/
1818

1919
use serde_json;
20-
use serde_json::json;
2120
use serde_json::Value;
2221

23-
pub fn flatten_json_body(body: &serde_json::Value) -> Result<Value, serde_json::Error> {
24-
let mut flat_value: Value = json!({});
25-
flatten_json::flatten(body, &mut flat_value, None, false, Some("_")).unwrap();
26-
Ok(flat_value)
22+
pub mod flatten;
23+
24+
pub fn flatten_json_body(body: Value) -> Result<Value, ()> {
25+
flatten::flatten(body, "_")
2726
}
2827

2928
pub fn merge(value: &mut Value, fields: impl Iterator<Item = (String, Value)>) {

0 commit comments

Comments
 (0)