Skip to content

Stream graceful shutdown #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ members = [
"examples/watch",
"examples/reverse_wallet",
"examples/call_watch",
"examples/drain_and_stop",
]
14 changes: 14 additions & 0 deletions examples/drain_and_stop/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "drain_and_stop"
version = "0.1.0"
edition = "2021"

[dependencies]
hypersync-client = { path = "../../hypersync-client" }

tokio = { version = "1", features = ["full"] }
serde_json = "1"
polars-arrow = { version = "0.42", features = [
"compute_aggregate",
] }
env_logger = "0.4"
103 changes: 103 additions & 0 deletions examples/drain_and_stop/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Example of using the client to stream data and then draining and stopping the stream
// It has no practical use but it is meant to show how to use the client

use std::sync::Arc;

use hypersync_client::{Client, ClientConfig, ColumnMapping, DataType, StreamConfig};

#[tokio::main]
async fn main() {
env_logger::init().unwrap();

// create default client, uses eth mainnet
let client = Client::new(ClientConfig::default()).unwrap();

let query = serde_json::from_value(serde_json::json!( {
// start from block 10123123 and go to the end of the chain (we don't specify a toBlock).
"from_block": 10123123,
// The logs we want. We will also automatically get transactions and blocks relating to these logs (the query implicitly joins them).
"logs": [
{
// We want All ERC20 transfers so no address filter and only a filter for the first topic
"topics": [
["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],
]
}
],
// Select the fields we are interested in, notice topics are selected as topic0,1,2,3
"field_selection": {
"block": [
"number",
],
"log": [
"data",
"topic0",
"topic1",
"topic2",
"topic3",
]
}
}))
.unwrap();

println!("Starting the stream");

// Put the client inside Arc so we can use it for streaming
let client = Arc::new(client);

let mut drained = vec![];

// Stream arrow data so we can average the erc20 transfer amounts in memory
//
// This will parallelize internal requests so we don't have to worry about pipelining/parallelizing make request -> handle response -> handle data loop
let mut receiver = client
.stream_arrow(
query,
StreamConfig {
// Pass the event signature for decoding
event_signature: Some(
"Transfer(address indexed from, address indexed to, uint amount)".to_owned(),
),
column_mapping: Some(ColumnMapping {
decoded_log: [
// Map the amount column to float so we can do aggregation on it
("amount".to_owned(), DataType::Float64),
]
.into_iter()
.collect(),
..Default::default()
}),
..Default::default()
},
)
.await
.unwrap();

let mut count = 0;

// Receive the data in a loop
while let Some(res) = receiver.recv().await {
let res = res.unwrap();
count += 1;

println!(
"scanned up to block: {}, found {} blocks",
res.next_block,
res.data.blocks.len()
);

if res.next_block > 10129290 {
drained = receiver.drain_and_stop().await;
println!("Drained {} responses", drained.len());
break;
}
}

count += drained.len();

for data in drained {
println!("data: {:?}", data.unwrap().next_block);
}

println!("response count: {}", count);
}
3 changes: 2 additions & 1 deletion hypersync-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub use hypersync_schema as schema;

use parse_response::parse_query_response;
use simple_types::Event;
use stream::ArrowStream;
use tokio::sync::mpsc;
use types::{EventResponse, ResponseData};
use url::Url;
Expand Down Expand Up @@ -525,7 +526,7 @@ impl Client {
self: Arc<Self>,
query: Query,
config: StreamConfig,
) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
) -> Result<ArrowStream> {
stream::stream_arrow(self, query, config).await
}

Expand Down
54 changes: 50 additions & 4 deletions hypersync-client/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ use polars_arrow::{
datatypes::ArrowDataType,
record_batch::RecordBatch,
};
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;

use crate::{
config::HexOutput,
Expand All @@ -25,11 +26,41 @@ use crate::{
ArrowBatch, ArrowResponseData, StreamConfig,
};

pub struct ArrowStream {
// Used to cancel the stream
cancel_token: CancellationToken,
// Join handle for waiting for the stream to finish
handle: JoinHandle<()>,
// Receiver for the stream
rx: mpsc::Receiver<Result<ArrowResponse>>,
}

impl ArrowStream {
pub async fn recv(&mut self) -> Option<Result<ArrowResponse>> {
self.rx.recv().await
}

/// Signals all tasks to stop via cancellation, then waits for them
/// to finish and drains any leftover items.
pub async fn drain_and_stop(self) -> Vec<Result<ArrowResponse>> {
self.cancel_token.cancel();

let mut drained = Vec::new();
let mut rx = self.rx;

while let Some(item) = rx.recv().await {
drained.push(item);
}

drained
}
}

pub async fn stream_arrow(
client: Arc<crate::Client>,
query: Query,
config: StreamConfig,
) -> Result<mpsc::Receiver<Result<ArrowResponse>>> {
) -> Result<ArrowStream> {
let concurrency = config.concurrency.unwrap_or(10);
let batch_size = config.batch_size.unwrap_or(1000);
let max_batch_size = config.max_batch_size.unwrap_or(200_000);
Expand All @@ -42,12 +73,19 @@ pub async fn stream_arrow(

let (tx, rx) = mpsc::channel(concurrency * 2);

let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();

let to_block = match query.to_block {
Some(to_block) => to_block,
None => client.get_height().await.context("get height")?,
};

tokio::spawn(async move {
let handle = tokio::spawn(async move {
if cancel_token.is_cancelled() {
return;
}

let mut query = query;

if !reverse {
Expand Down Expand Up @@ -97,6 +135,10 @@ pub async fn stream_arrow(
let mut next_req_idx = 0;

while futs.peek().is_some() {
if cancel_token.is_cancelled() {
break;
}

while let Some(res) = set.try_join_next() {
let (generation, req_idx, resps) = res.unwrap();
queue.insert(req_idx, (generation, resps));
Expand Down Expand Up @@ -208,7 +250,11 @@ pub async fn stream_arrow(
}
});

Ok(rx)
Ok(ArrowStream {
cancel_token: cancel_token_clone,
handle,
rx,
})
}

fn count_rows(batches: &[ArrowBatch]) -> usize {
Expand Down