Skip to content

feat: add 'respect eof' option for reading from bounded sources #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions src/lib.rs
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
//! }
//! }
//! ```
use std::collections::VecDeque;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use futures::Stream;
use serde::de::DeserializeOwned;
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use serde_json::Deserializer;
use std::collections::VecDeque;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use tracing::trace;

// should be 2^n - 1 for VecDeque to work efficiently
Expand All @@ -60,6 +60,7 @@ pub struct JsonStream<T, S> {
byte_buffer: VecDeque<u8>,
finished: bool,
max_buffer_capacity: usize,
respect_eof: bool,
}

impl<T, S: Unpin> JsonStream<T, S> {
Expand All @@ -85,12 +86,26 @@ impl<T, S: Unpin> JsonStream<T, S> {
Self {
stream,
entry_buffer: Vec::new(),
byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)),
byte_buffer: VecDeque::with_capacity(std::cmp::min(
DEFAULT_BUFFER_CAPACITY,
max_capacity,
)),
finished: false,
max_buffer_capacity: max_capacity
max_buffer_capacity: max_capacity,
respect_eof: false,
}
}

/// Toggles a mode where the stream will "complete" when the
/// buffer is empty, there are no processed messages, and the
/// underlying reader returns an EOF error. All other error
/// messages, either from the parsing (because there might be a
/// partial json object,) or from the IO reader (because more data
/// may arrive or recover eventually,) are ignored.
pub fn set_respect_eof(&mut self) {
self.respect_eof = true
}

/// Controls how large the internal buffer can grow in bytes. If the buffer grows larger than this
/// the stream is terminated as it is assumed that the stream is malformed. If this number is too
/// large, a malformed stream can cause the server to run out of memory.
Expand All @@ -115,10 +130,10 @@ impl<T, S: Unpin> JsonStream<T, S> {
}

impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
where
T: DeserializeOwned,
B: Deref<Target = [u8]>,
S: Stream<Item = Result<B, E>> + Unpin
where
T: DeserializeOwned,
B: Deref<Target = [u8]>,
S: Stream<Item = Result<B, E>> + Unpin,
{
type Item = Result<T, E>;

Expand All @@ -143,7 +158,7 @@ impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
Some(Err(err)) => {
self.finish();
return Poll::Ready(Some(Err(err)));
},
}
None => {
self.finish();
return Poll::Ready(None);
Expand All @@ -157,7 +172,7 @@ impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
// no room for this chunk
self.finish();
return Poll::Ready(None);
},
}
None => {
// overflow occurred
self.finish();
Expand All @@ -182,14 +197,23 @@ impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
Some(Ok(entry)) => {
last_read_pos = json_iter.byte_offset();
this.entry_buffer.push(entry);
},
}
// if there was an error, log it but move on because this could be a partial entry
Some(Err(err)) => {
trace!(err = ?err, "failed to parse json entry");
break
},
if *this.respect_eof
&& err.is_eof()
&& this.entry_buffer.is_empty()
&& buffer.is_empty()
{
self.finish();
return Poll::Ready(None);
} else {
trace!(err = ?err, "failed to parse json entry");
break;
};
}
// nothing left then we move on
None => break
None => break,
}
}

Expand All @@ -201,4 +225,4 @@ impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
this.byte_buffer.make_contiguous();
}
}
}
}