Skip to content

handle terminating streams, test and clarify delimeter semantics #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ futures = "0.3"
[dev-dependencies]
tokio = { version = "1.12", features = ["macros", "rt-multi-thread"] }
criterion = { version = "0.4", features = ["async_tokio"] }
tokio-test = "0.4.3"

[[bench]]
name = "json_stream"
harness = false
harness = false
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
edition = "2021"
180 changes: 132 additions & 48 deletions src/lib.rs
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![deny(missing_docs)]

//! A library to parse Newline Delimited JSON values from a byte stream.
//! A library to parse sequences of JSON values from a byte stream.
//!
//! # Example
//!
Expand All @@ -19,7 +19,7 @@
//! async fn main() {
//! // This could be any stream that yields bytes, such as a file stream or a network stream.
//! let pinned_bytes_future = Box::pin(async {
//! Ok::<_, std::io::Error>(r#"{"bar": "foo"}\n{"bar": "qux"}\n{"bar": "baz"}"#.as_bytes())
//! Ok::<_, std::io::Error>(r#"{"bar": "foo"} {"bar": "qux"} {"bar": "baz"}"#.as_bytes())
//! });
//! let mut json_stream = JsonStream::<Foo, _>::new(once(pinned_bytes_future));
//!
Expand All @@ -28,14 +28,14 @@
//! }
//! }
//! ```
use std::collections::VecDeque;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use futures::Stream;
use serde::de::DeserializeOwned;
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use serde_json::Deserializer;
use std::collections::VecDeque;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tracing::trace;

// should be 2^n - 1 for VecDeque to work efficiently
Expand All @@ -44,7 +44,9 @@ const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 256 - 1; // 256KB
/// will be buffered before the stream is terminated, by default.
pub const DEFAULT_MAX_BUFFER_CAPACITY: usize = 1024 * 1024 * 8 - 1; // 8 MB

/// A [`Stream`] implementation that can be used to parse Newline Delimited JSON values from a byte stream.
/// A [`Stream`] implementation that can be used to parse any sequence
/// of JSON values, regardless of separator, from a byte stream.
///
/// It does so by buffering bytes internally and parsing them as they are received.
/// This means that the stream will not yield values until a full JSON value has been received.
///
Expand Down Expand Up @@ -85,9 +87,12 @@ impl<T, S: Unpin> JsonStream<T, S> {
Self {
stream,
entry_buffer: Vec::new(),
byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)),
byte_buffer: VecDeque::with_capacity(std::cmp::min(
DEFAULT_BUFFER_CAPACITY,
max_capacity,
)),
finished: false,
max_buffer_capacity: max_capacity
max_buffer_capacity: max_capacity,
}
}

Expand Down Expand Up @@ -115,10 +120,10 @@ impl<T, S: Unpin> JsonStream<T, S> {
}

impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
where
T: DeserializeOwned,
B: Deref<Target = [u8]>,
S: Stream<Item = Result<B, E>> + Unpin
where
T: DeserializeOwned,
B: Deref<Target = [u8]>,
S: Stream<Item = Result<B, E>> + Unpin,
{
type Item = Result<T, E>;

Expand All @@ -129,47 +134,58 @@ impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
}

let mut this = self.as_mut().project();
let mut inner_exhausted = false;

loop {
// outer
// if we have an entry, we should return it immediately
if let Some(entry) = this.entry_buffer.pop() {
return Poll::Ready(Some(Ok(entry)));
}

// try to fetch the next chunk
let next_chunk = loop {
loop {
// inner
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(chunk)) => break chunk,
Some(Ok(chunk)) => {
// if there is no room for this chunk, we should give up
match this.byte_buffer.len().checked_add(chunk.len()) {
Some(new_size) => {
if new_size > DEFAULT_MAX_BUFFER_CAPACITY {
// no room for this chunk
self.finish();
return Poll::Ready(None);
} else {
// room is available, so let's add
// the chunk to the buffer, and
// exit the inner-buffer loop to
// parse the data from the stream.
this.byte_buffer.extend(&*chunk);
break; // inner
}
}
None => {
// overflow occurred
self.finish();
return Poll::Ready(None);
}
}
}
Some(Err(err)) => {
// this is an error in reading from the inner
// stream. Propagate this outward.
self.finish();
return Poll::Ready(Some(Err(err)));
},
}
None => {
self.finish();
return Poll::Ready(None);
inner_exhausted = true;
break; // inner
}
}
};

// if there is no room for this chunk, we should give up
match this.byte_buffer.len().checked_add(next_chunk.len()) {
Some(new_size) if new_size > DEFAULT_MAX_BUFFER_CAPACITY => {
// no room for this chunk
self.finish();
return Poll::Ready(None);
},
None => {
// overflow occurred
self.finish();
return Poll::Ready(None);
}
_ => {}
}

// room is available, so let's add the chunk
this.byte_buffer.extend(&*next_chunk);

// because we inserted more data into the VecDeque, we need to reassure the layout of it
// because we inserted more data into the VecDeque, we
// need to reassure the layout of it
this.byte_buffer.make_contiguous();
// we know that all of the data will be located in the first slice
let (buffer, _) = this.byte_buffer.as_slices();
Expand All @@ -182,23 +198,91 @@ impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
Some(Ok(entry)) => {
last_read_pos = json_iter.byte_offset();
this.entry_buffer.push(entry);
},
// if there was an error, log it but move on because this could be a partial entry
}
Some(Err(err)) => {
trace!(err = ?err, "failed to parse json entry");
break
},
break;
}
// nothing left then we move on
None => break
None => break,
}
}

// remove the read bytes - this is very efficient as it's a ring buffer
// there's no more data that the inner stream will produce
// and the last attempt to iterate through the buffer did
// not produce more objects: this stream should end now.
if inner_exhausted && this.entry_buffer.is_empty() {
self.finish();
return Poll::Ready(None);
}

// remove the read bytes - this is very efficient as it's
// a ring buffer
let _ = this.byte_buffer.drain(..last_read_pos);
// realign the buffer to the beginning so we can get contiguous slices
// we want to do this with all of the read bytes removed because this operation becomes a memcpy
// if we waited until after we added bytes again, it could devolve into a much slower operation

// realign the buffer to the beginning so we can get
// contiguous slices we want to do this with all of the
// read bytes removed because this operation becomes a
// memcpy if we waited until after we added bytes again,
// it could devolve into a much slower operation
this.byte_buffer.make_contiguous();
}
}
}
}
#[cfg(test)]
mod tests {
use futures::stream::StreamExt;
use serde_json::Value;

use crate::JsonStream;

async fn assert_stream(data: &str) {
let test_stream = Box::pin(async { Ok::<_, std::io::Error>(data.as_bytes()) });
let mut json_stream = JsonStream::<Value, _>::new(futures::stream::once(test_stream));

let mut count: usize = 0;
while let Some(Ok(value)) = json_stream.next().await {
println!("{:?}", value);
count += 1;
}
assert_eq!(count, 3);
}

#[tokio::test]
async fn test_no_seperators() {
assert_stream(r#"{"bar":"foo"}{"bar":"qux"}{"bar":"baz"}"#).await;
}

#[tokio::test]
async fn test_spaced_documents() {
assert_stream(r#"{"bar":"foo"} {"bar":"qux"} {"bar":"baz"}"#).await;
}

#[tokio::test]
async fn test_line_separated() {
assert_stream(
r#"{"bar":"foo"}
{"bar":"qux"}
{"bar":"baz"}"#,
)
.await;
}

#[tokio::test]
async fn test_multiline_documents() {
assert_stream(
r#"
{
"bar":"foo"
}
{
"bar":"qux"
}
{
"bar":"baz"
}
"#,
)
.await;
}
}