diff --git a/hypersync-client/src/column_mapping.rs b/hypersync-client/src/column_mapping.rs index ad19491..bd717b4 100644 --- a/hypersync-client/src/column_mapping.rs +++ b/hypersync-client/src/column_mapping.rs @@ -37,6 +37,9 @@ pub struct ColumnMapping { /// Mapping for decoded log data. #[serde(default)] pub decoded_log: BTreeMap, + /// Mapping for decoded trace data. + #[serde(default)] + pub decoded_trace: BTreeMap, } #[allow(missing_docs)] diff --git a/hypersync-client/src/config.rs b/hypersync-client/src/config.rs index 8c7f881..2ee9f6d 100644 --- a/hypersync-client/src/config.rs +++ b/hypersync-client/src/config.rs @@ -31,6 +31,8 @@ pub struct StreamConfig { pub column_mapping: Option, /// Event signature used to populate decode logs. Decode logs would be empty if set to None. pub event_signature: Option, + /// Trace signature used to populate decoded traces. Decoded traces would be empty if set to None. + pub trace_signature: Option, /// Determines formatting of binary columns numbers into utf8 hex. #[serde(default)] pub hex_output: HexOutput, diff --git a/hypersync-client/src/parse_response.rs b/hypersync-client/src/parse_response.rs index b50fada..eb2edd9 100644 --- a/hypersync-client/src/parse_response.rs +++ b/hypersync-client/src/parse_response.rs @@ -90,6 +90,7 @@ pub fn parse_query_response(bytes: &[u8]) -> Result { logs, traces, decoded_logs: Vec::new(), + decoded_traces: Vec::new(), }, rollback_guard, }) diff --git a/hypersync-client/src/stream.rs b/hypersync-client/src/stream.rs index 958e1d3..945afb2 100644 --- a/hypersync-client/src/stream.rs +++ b/hypersync-client/src/stream.rs @@ -17,6 +17,7 @@ use polars_arrow::{ use tokio::sync::mpsc; use tokio::task::JoinSet; +use crate::util::{decode_traces_batch, filter_reverted_rows}; use crate::{ config::HexOutput, rayon_async, @@ -263,6 +264,28 @@ async fn map_responses( .collect::>>()?, None => Vec::new(), }, + decoded_traces: match cfg.trace_signature.as_ref() { + Some(sig) => resp + .data + .traces + .iter() + .map(|batch| { + // filter out "Reverted" rows + let batch = filter_reverted_rows(batch).context("filter reverted traces")?; + // decode the traces + let batch = decode_traces_batch(sig, &batch) + .context("decode traces")?; + map_batch( + cfg.column_mapping.as_ref().map(|cm| &cm.decoded_trace), + cfg.hex_output, + batch, + reverse, + ) + .context("map batch") + }) + .collect::>>()?, + None => Vec::new(), + }, blocks: resp .data .blocks diff --git a/hypersync-client/src/types.rs b/hypersync-client/src/types.rs index 0399c5c..bb3762f 100644 --- a/hypersync-client/src/types.rs +++ b/hypersync-client/src/types.rs @@ -23,6 +23,10 @@ pub struct ArrowResponseData { /// /// Populated only if event_singature is present. pub decoded_logs: Vec, + /// Query decoded_traces response. + /// + /// Populated only if trace_singature is present. + pub decoded_traces: Vec, } /// Query response data in Rust native format diff --git a/hypersync-client/src/util.rs b/hypersync-client/src/util.rs index c147fd5..8a1acdd 100644 --- a/hypersync-client/src/util.rs +++ b/hypersync-client/src/util.rs @@ -1,7 +1,8 @@ use std::sync::Arc; - -use alloy_dyn_abi::{DynSolType, DynSolValue, Specifier}; -use alloy_json_abi::EventParam; +use alloy_dyn_abi::{DynSolType, DynSolValue, Specifier }; +use alloy_json_abi::{EventParam, Param}; +use polars_arrow::array::UInt64Array; +use polars_arrow::datatypes::ArrowDataType; use anyhow::{anyhow, Context, Result}; use hypersync_schema::empty_chunk; use polars_arrow::{ @@ -186,6 +187,115 @@ pub fn decode_logs_batch(sig: &str, batch: &ArrowBatch) -> Result { }) } +pub fn decode_traces_batch(sig: &str, batch: &ArrowBatch) -> Result { + let sig = alloy_json_abi::Function::parse(sig).context("parse function signature")?; + + let schema = schema_from_function_signature(&sig) + .context("build arrow schema from function signature")?; + if batch.chunk.is_empty() { + return Ok(ArrowBatch { + chunk: Arc::new(empty_chunk(&schema)), + schema: Arc::new(schema), + }); + } + + let function = sig + .resolve() + .context("resolve signature into function call")?; + + let input_cols = { + let input_data = batch + .column::>("input") + .context("get input column")?; + + //Parameter Decoding: + let decoded_inputs = input_data + .values_iter() + .map(|input_bytes| { + if input_bytes.len() < 4 { + return Ok(None::>); + } + // Skip function selector (first 4 bytes) + let input_data = &input_bytes[4..]; + + match function.abi_decode_input(input_data, true) { + Ok(decoded) => Ok(Some(decoded)), + Err(e) => { + log::trace!( + "failed to decode trace input, will write null instead. Error was: {:?}", + e + ); + Ok(None) + } + } + }) + .collect::>>()?; + + // Create a column for each input parameter + sig.inputs + .iter() + .enumerate() + .map(|(i, param)| { + decode_body_col( + decoded_inputs + .iter() + .map(|t| t.as_ref().map(|t| t.get(i).unwrap())), + &DynSolType::parse(¶m.ty).context("parse parameter type")?, + ) + .context("decode input parameter") + }) + .collect::>>()? + }; + + // Decode outputs + let output_cols = { + let output_data = batch + .column::>("output") + .context("get output column")?; + + let decoded_outputs = output_data + .values_iter() + .map(|output_bytes| { + match function.abi_decode_output(output_bytes, true) { + Ok(decoded) => Ok(Some(decoded)), + Err(e) => { + log::trace!( + "failed to decode trace output, will write null instead. Error was: {:?}", + e + ); + Ok(None) + } + } + }) + .collect::>>()?; + + sig.outputs + .iter() + .enumerate() + .map(|(i, param)| { + decode_body_col( + decoded_outputs + .iter() + .map(|t| t.as_ref().map(|t| t.get(i).unwrap())), + &DynSolType::parse(¶m.ty).context("parse parameter type")?, + ) + .context("decode output parameter") + }) + .collect::>>()? + }; + + // Combine input and output columns + let mut cols = input_cols; + cols.extend(output_cols); + + let chunk = Arc::new(ArrowChunk::try_new(cols).context("create arrow chunk")?); + + Ok(ArrowBatch { + chunk, + schema: Arc::new(schema), + }) +} + fn decode_body_col<'a, I: ExactSizeIterator>>( vals: I, ty: &DynSolType, @@ -383,6 +493,28 @@ fn schema_from_event_signature(sig: &alloy_json_abi::Event) -> Result { Ok(Schema::from(fields)) } +fn schema_from_function_signature(sig: &alloy_json_abi::Function) -> Result { + let dyn_sol_call = sig.resolve().context("resolve signature into function")?; + + let mut fields: Vec = Vec::new(); + + for (input, resolved_type) in sig.inputs.iter().zip(dyn_sol_call.types()) { + fields.push( + signature_to_function_field(&fields, input, resolved_type).context("process input")?, + ); + } + + // Add output fields + for (output, resolved_type) in sig.outputs.iter().zip(dyn_sol_call.returns().types()) { + fields.push( + signature_to_function_field(&fields, output, resolved_type) + .context("process output")?, + ); + } + + Ok(Schema::from(fields)) +} + fn signature_input_to_field( fields: &[Field], input: &EventParam, @@ -412,6 +544,35 @@ fn signature_input_to_field( Ok(Field::new(input.name.clone(), dt, true)) } +fn signature_to_function_field( + fields: &[Field], + input: &Param, + resolved_type: &DynSolType, +) -> Result { + if input.name.is_empty() { + return Err(anyhow!("empty param names are not supported")); + } + + if fields + .iter() + .any(|f| f.name.as_str() == input.name.as_str()) + { + return Err(anyhow!("duplicate param name: {}", input.name)); + } + + let ty = DynSolType::parse(&input.ty).context("parse solidity type")?; + + if &ty != resolved_type { + return Err(anyhow!( + "Internal error: Parsed type doesn't match resolved type. This should never happen." + )); + } + + let dt = simple_type_to_data_type(&ty).context("convert simple type to arrow datatype")?; + + Ok(Field::new(input.name.clone(), dt, true)) +} + fn simple_type_to_data_type(ty: &DynSolType) -> Result { match ty { DynSolType::Bool => Ok(DataType::Boolean), @@ -475,6 +636,75 @@ pub fn map_batch_to_binary_view(batch: ArrowBatch) -> ArrowBatch { } } +pub fn filter_reverted_rows(batch: &ArrowBatch) -> Result{ + let error = batch.column::>("error").context("failed to get error column")?; + // Create a mask based on the "error" column where "Reverted" values are excluded + let mask: Vec = (0..error.len()) + .map(|idx| { + match error.get(idx) { + None => true, // will return true when the value is None, indicating that the row should be kept + Some(value) => value != "Reverted", //will return false when the value is Some("Reverted") indicating the row should be removed OR return true for any othe Some value + } + }) + .collect(); + + let filtered_columns : Result>>= batch + .schema + .fields + .iter() + .zip(batch.chunk.columns().as_ref()) + .map(|(field, col)| { + match field.data_type() { + ArrowDataType::Binary => { + let typed_column = col + .as_any() + .downcast_ref::>() + .context("failed to downcast to BinaryArray")?; + let filtered = typed_column + .iter() + .zip(mask.iter()) + .filter(|(_, &keep)| keep) + .map(|(value, _)| value) + .collect::>(); + Ok(Box::new(filtered) as Box) + }, + ArrowDataType::UInt64 => { + let typed_column = col + .as_any() + .downcast_ref::() + .context("failed to downcast to UInt64Array")?; + let filtered = typed_column + .iter() + .zip(mask.iter()) + .filter(|(_, &keep)| keep) + .map(|(value, _)| value.copied()) + .collect::(); + Ok(Box::new(filtered) as Box) + }, + ArrowDataType::Utf8 => { + let typed_column = col + .as_any() + .downcast_ref::>() + .context("failed to downcast to Utf8Array")?; + let filtered = typed_column + .iter() + .zip(mask.iter()) + .filter(|(_, &keep)| keep) + .map(|(value, _)| value) + .collect::>(); + Ok(Box::new(filtered) as Box) + }, + dt=> Err(anyhow!("unsupported datatype : {:?}", dt)), + } + }).collect(); + + // Create new RecordBatch with filtered columns + let filtered_columns = filtered_columns.context("filter reverted columns")?; + + let chunk = Arc::new(ArrowChunk::try_new(filtered_columns).context("reverted chunk")?); + Ok(ArrowBatch { chunk, schema: batch.schema.clone() }) +} + #[cfg(test)] mod tests { use alloy_json_abi::Event;