From 0a32cac37205c10654109b384fcbf176d50cbca4 Mon Sep 17 00:00:00 2001 From: tycho garen Date: Mon, 4 Mar 2024 14:32:44 -0500 Subject: [PATCH 1/2] chore: refactor looping --- src/lib.rs | 80 +++++++++++++++++++++++++++++------------------------- 1 file changed, 43 insertions(+), 37 deletions(-) mode change 100644 => 100755 src/lib.rs diff --git a/src/lib.rs b/src/lib.rs old mode 100644 new mode 100755 index fec8ae6..ef71d46 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,14 +28,14 @@ //! } //! } //! ``` -use std::collections::VecDeque; -use std::ops::Deref; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; use futures::Stream; -use serde::de::DeserializeOwned; use pin_project::pin_project; +use serde::de::DeserializeOwned; use serde_json::Deserializer; +use std::collections::VecDeque; +use std::ops::Deref; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; use tracing::trace; // should be 2^n - 1 for VecDeque to work efficiently @@ -85,9 +85,12 @@ impl JsonStream { Self { stream, entry_buffer: Vec::new(), - byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)), + byte_buffer: VecDeque::with_capacity(std::cmp::min( + DEFAULT_BUFFER_CAPACITY, + max_capacity, + )), finished: false, - max_buffer_capacity: max_capacity + max_buffer_capacity: max_capacity, } } @@ -115,10 +118,10 @@ impl JsonStream { } impl Stream for JsonStream - where - T: DeserializeOwned, - B: Deref, - S: Stream> + Unpin +where + T: DeserializeOwned, + B: Deref, + S: Stream> + Unpin, { type Item = Result; @@ -137,38 +140,41 @@ impl Stream for JsonStream } // try to fetch the next chunk - let next_chunk = loop { + loop { match ready!(this.stream.as_mut().poll_next(cx)) { - Some(Ok(chunk)) => break chunk, + Some(Ok(chunk)) => { + // if there is no room for this chunk, we should give up + match this.byte_buffer.len().checked_add(chunk.len()) { + Some(new_size) => { + if new_size > DEFAULT_MAX_BUFFER_CAPACITY { + // no room for this chunk + self.finish(); + return Poll::Ready(None); + } else { + // room is available, so let's add the chunk + this.byte_buffer.extend(&*chunk); + + break; + } + } + None => { + // overflow occurred + self.finish(); + return Poll::Ready(None); + } + } + } Some(Err(err)) => { self.finish(); return Poll::Ready(Some(Err(err))); - }, + } None => { self.finish(); return Poll::Ready(None); } } - }; - - // if there is no room for this chunk, we should give up - match this.byte_buffer.len().checked_add(next_chunk.len()) { - Some(new_size) if new_size > DEFAULT_MAX_BUFFER_CAPACITY => { - // no room for this chunk - self.finish(); - return Poll::Ready(None); - }, - None => { - // overflow occurred - self.finish(); - return Poll::Ready(None); - } - _ => {} } - // room is available, so let's add the chunk - this.byte_buffer.extend(&*next_chunk); - // because we inserted more data into the VecDeque, we need to reassure the layout of it this.byte_buffer.make_contiguous(); // we know that all of the data will be located in the first slice @@ -182,14 +188,14 @@ impl Stream for JsonStream Some(Ok(entry)) => { last_read_pos = json_iter.byte_offset(); this.entry_buffer.push(entry); - }, + } // if there was an error, log it but move on because this could be a partial entry Some(Err(err)) => { trace!(err = ?err, "failed to parse json entry"); - break - }, + break; + } // nothing left then we move on - None => break + None => break, } } @@ -201,4 +207,4 @@ impl Stream for JsonStream this.byte_buffer.make_contiguous(); } } -} \ No newline at end of file +} From bd4990fab95f789740a75a8eea98d5dac1f0160a Mon Sep 17 00:00:00 2001 From: tycho garen Date: Mon, 4 Mar 2024 16:29:29 -0500 Subject: [PATCH 2/2] bounded stream multiple format tests --- Cargo.toml | 3 +- rustfmt.toml | 1 + src/lib.rs | 106 ++++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 95 insertions(+), 15 deletions(-) create mode 100644 rustfmt.toml diff --git a/Cargo.toml b/Cargo.toml index 4cddc78..e609c3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,8 @@ futures = "0.3" [dev-dependencies] tokio = { version = "1.12", features = ["macros", "rt-multi-thread"] } criterion = { version = "0.4", features = ["async_tokio"] } +tokio-test = "0.4.3" [[bench]] name = "json_stream" -harness = false \ No newline at end of file +harness = false diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..3a26366 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2021" diff --git a/src/lib.rs b/src/lib.rs index ef71d46..e141f68 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ #![deny(missing_docs)] -//! A library to parse Newline Delimited JSON values from a byte stream. +//! A library to parse sequences of JSON values from a byte stream. //! //! # Example //! @@ -19,7 +19,7 @@ //! async fn main() { //! // This could be any stream that yields bytes, such as a file stream or a network stream. //! let pinned_bytes_future = Box::pin(async { -//! Ok::<_, std::io::Error>(r#"{"bar": "foo"}\n{"bar": "qux"}\n{"bar": "baz"}"#.as_bytes()) +//! Ok::<_, std::io::Error>(r#"{"bar": "foo"} {"bar": "qux"} {"bar": "baz"}"#.as_bytes()) //! }); //! let mut json_stream = JsonStream::::new(once(pinned_bytes_future)); //! @@ -44,7 +44,9 @@ const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 256 - 1; // 256KB /// will be buffered before the stream is terminated, by default. pub const DEFAULT_MAX_BUFFER_CAPACITY: usize = 1024 * 1024 * 8 - 1; // 8 MB -/// A [`Stream`] implementation that can be used to parse Newline Delimited JSON values from a byte stream. +/// A [`Stream`] implementation that can be used to parse any sequence +/// of JSON values, regardless of separator, from a byte stream. +/// /// It does so by buffering bytes internally and parsing them as they are received. /// This means that the stream will not yield values until a full JSON value has been received. /// @@ -132,8 +134,10 @@ where } let mut this = self.as_mut().project(); + let mut inner_exhausted = false; loop { + // outer // if we have an entry, we should return it immediately if let Some(entry) = this.entry_buffer.pop() { return Poll::Ready(Some(Ok(entry))); @@ -141,6 +145,7 @@ where // try to fetch the next chunk loop { + // inner match ready!(this.stream.as_mut().poll_next(cx)) { Some(Ok(chunk)) => { // if there is no room for this chunk, we should give up @@ -151,10 +156,12 @@ where self.finish(); return Poll::Ready(None); } else { - // room is available, so let's add the chunk + // room is available, so let's add + // the chunk to the buffer, and + // exit the inner-buffer loop to + // parse the data from the stream. this.byte_buffer.extend(&*chunk); - - break; + break; // inner } } None => { @@ -165,17 +172,20 @@ where } } Some(Err(err)) => { + // this is an error in reading from the inner + // stream. Propagate this outward. self.finish(); return Poll::Ready(Some(Err(err))); } None => { - self.finish(); - return Poll::Ready(None); + inner_exhausted = true; + break; // inner } } } - // because we inserted more data into the VecDeque, we need to reassure the layout of it + // because we inserted more data into the VecDeque, we + // need to reassure the layout of it this.byte_buffer.make_contiguous(); // we know that all of the data will be located in the first slice let (buffer, _) = this.byte_buffer.as_slices(); @@ -189,7 +199,6 @@ where last_read_pos = json_iter.byte_offset(); this.entry_buffer.push(entry); } - // if there was an error, log it but move on because this could be a partial entry Some(Err(err)) => { trace!(err = ?err, "failed to parse json entry"); break; @@ -199,12 +208,81 @@ where } } - // remove the read bytes - this is very efficient as it's a ring buffer + // there's no more data that the inner stream will produce + // and the last attempt to iterate through the buffer did + // not produce more objects: this stream should end now. + if inner_exhausted && this.entry_buffer.is_empty() { + self.finish(); + return Poll::Ready(None); + } + + // remove the read bytes - this is very efficient as it's + // a ring buffer let _ = this.byte_buffer.drain(..last_read_pos); - // realign the buffer to the beginning so we can get contiguous slices - // we want to do this with all of the read bytes removed because this operation becomes a memcpy - // if we waited until after we added bytes again, it could devolve into a much slower operation + + // realign the buffer to the beginning so we can get + // contiguous slices we want to do this with all of the + // read bytes removed because this operation becomes a + // memcpy if we waited until after we added bytes again, + // it could devolve into a much slower operation this.byte_buffer.make_contiguous(); } } } +#[cfg(test)] +mod tests { + use futures::stream::StreamExt; + use serde_json::Value; + + use crate::JsonStream; + + async fn assert_stream(data: &str) { + let test_stream = Box::pin(async { Ok::<_, std::io::Error>(data.as_bytes()) }); + let mut json_stream = JsonStream::::new(futures::stream::once(test_stream)); + + let mut count: usize = 0; + while let Some(Ok(value)) = json_stream.next().await { + println!("{:?}", value); + count += 1; + } + assert_eq!(count, 3); + } + + #[tokio::test] + async fn test_no_seperators() { + assert_stream(r#"{"bar":"foo"}{"bar":"qux"}{"bar":"baz"}"#).await; + } + + #[tokio::test] + async fn test_spaced_documents() { + assert_stream(r#"{"bar":"foo"} {"bar":"qux"} {"bar":"baz"}"#).await; + } + + #[tokio::test] + async fn test_line_separated() { + assert_stream( + r#"{"bar":"foo"} + {"bar":"qux"} +{"bar":"baz"}"#, + ) + .await; + } + + #[tokio::test] + async fn test_multiline_documents() { + assert_stream( + r#" +{ + "bar":"foo" +} +{ + "bar":"qux" +} +{ + "bar":"baz" +} +"#, + ) + .await; + } +}