Skip to content

Commit b251e05

Browse files
authored
update error handling when parsing recordbatch to rows (#783)
1 parent 0190d85 commit b251e05

File tree

4 files changed

+31
-56
lines changed

4 files changed

+31
-56
lines changed

server/src/handlers/http/ingest.rs

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -631,49 +631,6 @@ mod tests {
631631
);
632632
}
633633

634-
#[test]
635-
fn arr_obj_ignore_all_null_field() {
636-
let json = json!([
637-
{
638-
"a": 1,
639-
"b": "hello",
640-
"c": null
641-
},
642-
{
643-
"a": 1,
644-
"b": "hello",
645-
"c": null
646-
},
647-
{
648-
"a": 1,
649-
"b": "hello",
650-
"c": null
651-
},
652-
]);
653-
654-
let req = TestRequest::default().to_http_request();
655-
656-
let (_, rb, _) = into_event_batch(
657-
req,
658-
Bytes::from(serde_json::to_vec(&json).unwrap()),
659-
HashMap::default(),
660-
None,
661-
None,
662-
)
663-
.unwrap();
664-
665-
assert_eq!(rb.num_rows(), 3);
666-
assert_eq!(rb.num_columns(), 6);
667-
assert_eq!(
668-
rb.column_by_name("a").unwrap().as_int64_arr(),
669-
&Int64Array::from(vec![Some(1), Some(1), Some(1)])
670-
);
671-
assert_eq!(
672-
rb.column_by_name("b").unwrap().as_utf8_arr(),
673-
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
674-
);
675-
}
676-
677634
#[test]
678635
fn arr_schema_mismatch() {
679636
let json = json!([

server/src/handlers/http/query.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
105105
fill_null: query_request.send_null,
106106
with_fields: query_request.fields,
107107
}
108-
.to_http();
108+
.to_http()?;
109109

110110
let time = time.elapsed().as_secs_f64();
111111

@@ -293,12 +293,17 @@ pub enum QueryError {
293293
EventError(#[from] EventError),
294294
#[error("Error: {0}")]
295295
MalformedQuery(String),
296+
#[error(
297+
r#"Error: Failed to Parse Record Batch into Json
298+
Description: {0}"#
299+
)]
300+
JsonParse(String),
296301
}
297302

298303
impl actix_web::ResponseError for QueryError {
299304
fn status_code(&self) -> http::StatusCode {
300305
match self {
301-
QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
306+
QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR,
302307
_ => StatusCode::BAD_REQUEST,
303308
}
304309
}

server/src/response.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
*
1717
*/
1818

19+
use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
1920
use actix_web::{web, Responder};
2021
use datafusion::arrow::record_batch::RecordBatch;
2122
use itertools::Itertools;
2223
use serde_json::{json, Value};
2324

24-
use crate::utils::arrow::record_batches_to_json;
25-
2625
pub struct QueryResponse {
2726
pub records: Vec<RecordBatch>,
2827
pub fields: Vec<String>,
@@ -31,10 +30,11 @@ pub struct QueryResponse {
3130
}
3231

3332
impl QueryResponse {
34-
pub fn to_http(&self) -> impl Responder {
33+
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
3534
log::info!("{}", "Returning query results");
3635
let records: Vec<&RecordBatch> = self.records.iter().collect();
37-
let mut json_records = record_batches_to_json(&records);
36+
let mut json_records = record_batches_to_json(&records)
37+
.map_err(|err| QueryError::JsonParse(err.to_string()))?;
3838
if self.fill_null {
3939
for map in &mut json_records {
4040
for field in &self.fields {
@@ -55,6 +55,6 @@ impl QueryResponse {
5555
Value::Array(values)
5656
};
5757

58-
web::Json(response)
58+
Ok(web::Json(response))
5959
}
6060
}

server/src/utils/arrow.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub mod batch_adapter;
2727
pub mod merged_reader;
2828
pub mod reverse_reader;
2929

30+
use anyhow::Result;
3031
pub use batch_adapter::adapt_batch;
3132
pub use merged_reader::MergedRecordReader;
3233
use serde_json::{Map, Value};
@@ -63,19 +64,23 @@ pub fn replace_columns(
6364
/// * `records` - The record batches to convert.
6465
///
6566
/// # Returns
67+
/// * Result<Vec<Map<String, Value>>>
6668
///
6769
/// A vector of JSON objects representing the record batches.
68-
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Vec<Map<String, Value>> {
70+
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
6971
let buf = vec![];
7072
let mut writer = arrow_json::ArrayWriter::new(buf);
71-
writer.write_batches(records).unwrap();
72-
writer.finish().unwrap();
73+
writer.write_batches(records)?;
74+
writer.finish()?;
7375

7476
let buf = writer.into_inner();
7577

76-
let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(buf.as_slice()).unwrap();
78+
let json_rows: Vec<Map<String, Value>> = match serde_json::from_reader(buf.as_slice()) {
79+
Ok(json) => json,
80+
Err(_) => vec![],
81+
};
7782

78-
json_rows
83+
Ok(json_rows)
7984
}
8085

8186
/// Retrieves a field from a slice of fields by name.
@@ -105,7 +110,7 @@ mod tests {
105110
use arrow_array::{Array, Int32Array, RecordBatch};
106111
use arrow_schema::{DataType, Field, Schema};
107112

108-
use super::replace_columns;
113+
use super::{record_batches_to_json, replace_columns};
109114

110115
#[test]
111116
fn check_replace() {
@@ -135,4 +140,12 @@ mod tests {
135140
assert_eq!(new_rb.num_columns(), 3);
136141
assert_eq!(new_rb.num_rows(), 3)
137142
}
143+
144+
#[test]
145+
fn check_empty_json_to_record_batches() {
146+
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
147+
let rb = vec![&r];
148+
let batches = record_batches_to_json(&rb).unwrap();
149+
assert_eq!(batches, vec![]);
150+
}
138151
}

0 commit comments

Comments
 (0)