Skip to content

feat(decoded_traces): implement auto decoding traces #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hypersync-client/src/column_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct ColumnMapping {
/// Mapping for decoded log data.
#[serde(default)]
pub decoded_log: BTreeMap<String, DataType>,
/// Mapping for decoded trace data.
#[serde(default)]
pub decoded_trace: BTreeMap<String, DataType>,
}

#[allow(missing_docs)]
Expand Down
2 changes: 2 additions & 0 deletions hypersync-client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub struct StreamConfig {
pub column_mapping: Option<ColumnMapping>,
/// Event signature used to populate decode logs. Decode logs would be empty if set to None.
pub event_signature: Option<String>,
/// Trace signature used to populate decoded traces. Decoded traces would be empty if set to None.
pub trace_signature: Option<String>,
/// Determines formatting of binary columns numbers into utf8 hex.
#[serde(default)]
pub hex_output: HexOutput,
Expand Down
1 change: 1 addition & 0 deletions hypersync-client/src/parse_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub fn parse_query_response(bytes: &[u8]) -> Result<ArrowResponse> {
logs,
traces,
decoded_logs: Vec::new(),
decoded_traces: Vec::new(),
},
rollback_guard,
})
Expand Down
23 changes: 23 additions & 0 deletions hypersync-client/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use polars_arrow::{
use tokio::sync::mpsc;
use tokio::task::JoinSet;

use crate::util::{decode_traces_batch, filter_reverted_rows};
use crate::{
config::HexOutput,
rayon_async,
Expand Down Expand Up @@ -263,6 +264,28 @@ async fn map_responses(
.collect::<Result<Vec<_>>>()?,
None => Vec::new(),
},
decoded_traces: match cfg.trace_signature.as_ref() {
Some(sig) => resp
.data
.traces
.iter()
.map(|batch| {
// filter out "Reverted" rows
let batch = filter_reverted_rows(batch).context("filter reverted traces")?;
// decode the traces
let batch = decode_traces_batch(sig, &batch)
.context("decode traces")?;
map_batch(
cfg.column_mapping.as_ref().map(|cm| &cm.decoded_trace),
cfg.hex_output,
batch,
reverse,
)
.context("map batch")
})
.collect::<Result<Vec<_>>>()?,
None => Vec::new(),
},
blocks: resp
.data
.blocks
Expand Down
4 changes: 4 additions & 0 deletions hypersync-client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ pub struct ArrowResponseData {
///
/// Populated only if event_singature is present.
pub decoded_logs: Vec<ArrowBatch>,
/// Query decoded_traces response.
///
/// Populated only if trace_singature is present.
pub decoded_traces: Vec<ArrowBatch>,
}

/// Query response data in Rust native format
Expand Down
236 changes: 233 additions & 3 deletions hypersync-client/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::sync::Arc;

use alloy_dyn_abi::{DynSolType, DynSolValue, Specifier};
use alloy_json_abi::EventParam;
use alloy_dyn_abi::{DynSolType, DynSolValue, Specifier };
use alloy_json_abi::{EventParam, Param};
use polars_arrow::array::UInt64Array;
use polars_arrow::datatypes::ArrowDataType;
use anyhow::{anyhow, Context, Result};
use hypersync_schema::empty_chunk;
use polars_arrow::{
Expand Down Expand Up @@ -186,6 +187,115 @@ pub fn decode_logs_batch(sig: &str, batch: &ArrowBatch) -> Result<ArrowBatch> {
})
}

pub fn decode_traces_batch(sig: &str, batch: &ArrowBatch) -> Result<ArrowBatch> {
let sig = alloy_json_abi::Function::parse(sig).context("parse function signature")?;

let schema = schema_from_function_signature(&sig)
.context("build arrow schema from function signature")?;
if batch.chunk.is_empty() {
return Ok(ArrowBatch {
chunk: Arc::new(empty_chunk(&schema)),
schema: Arc::new(schema),
});
}

let function = sig
.resolve()
.context("resolve signature into function call")?;

let input_cols = {
let input_data = batch
.column::<BinaryArray<i32>>("input")
.context("get input column")?;

//Parameter Decoding:
let decoded_inputs = input_data
.values_iter()
.map(|input_bytes| {
if input_bytes.len() < 4 {
return Ok(None::<Vec<DynSolValue>>);
}
// Skip function selector (first 4 bytes)
let input_data = &input_bytes[4..];

match function.abi_decode_input(input_data, true) {
Ok(decoded) => Ok(Some(decoded)),
Err(e) => {
log::trace!(
"failed to decode trace input, will write null instead. Error was: {:?}",
e
);
Ok(None)
}
}
})
.collect::<Result<Vec<_>>>()?;

// Create a column for each input parameter
sig.inputs
.iter()
.enumerate()
.map(|(i, param)| {
decode_body_col(
decoded_inputs
.iter()
.map(|t| t.as_ref().map(|t| t.get(i).unwrap())),
&DynSolType::parse(&param.ty).context("parse parameter type")?,
)
.context("decode input parameter")
})
.collect::<Result<Vec<_>>>()?
};

// Decode outputs
let output_cols = {
let output_data = batch
.column::<BinaryArray<i32>>("output")
.context("get output column")?;

let decoded_outputs = output_data
.values_iter()
.map(|output_bytes| {
match function.abi_decode_output(output_bytes, true) {
Ok(decoded) => Ok(Some(decoded)),
Err(e) => {
log::trace!(
"failed to decode trace output, will write null instead. Error was: {:?}",
e
);
Ok(None)
}
}
})
.collect::<Result<Vec<_>>>()?;

sig.outputs
.iter()
.enumerate()
.map(|(i, param)| {
decode_body_col(
decoded_outputs
.iter()
.map(|t| t.as_ref().map(|t| t.get(i).unwrap())),
&DynSolType::parse(&param.ty).context("parse parameter type")?,
)
.context("decode output parameter")
})
.collect::<Result<Vec<_>>>()?
};

// Combine input and output columns
let mut cols = input_cols;
cols.extend(output_cols);

let chunk = Arc::new(ArrowChunk::try_new(cols).context("create arrow chunk")?);

Ok(ArrowBatch {
chunk,
schema: Arc::new(schema),
})
}

fn decode_body_col<'a, I: ExactSizeIterator<Item = Option<&'a DynSolValue>>>(
vals: I,
ty: &DynSolType,
Expand Down Expand Up @@ -383,6 +493,28 @@ fn schema_from_event_signature(sig: &alloy_json_abi::Event) -> Result<Schema> {
Ok(Schema::from(fields))
}

fn schema_from_function_signature(sig: &alloy_json_abi::Function) -> Result<Schema> {
let dyn_sol_call = sig.resolve().context("resolve signature into function")?;

let mut fields: Vec<Field> = Vec::new();

for (input, resolved_type) in sig.inputs.iter().zip(dyn_sol_call.types()) {
fields.push(
signature_to_function_field(&fields, input, resolved_type).context("process input")?,
);
}

// Add output fields
for (output, resolved_type) in sig.outputs.iter().zip(dyn_sol_call.returns().types()) {
fields.push(
signature_to_function_field(&fields, output, resolved_type)
.context("process output")?,
);
}

Ok(Schema::from(fields))
}

fn signature_input_to_field(
fields: &[Field],
input: &EventParam,
Expand Down Expand Up @@ -412,6 +544,35 @@ fn signature_input_to_field(
Ok(Field::new(input.name.clone(), dt, true))
}

fn signature_to_function_field(
fields: &[Field],
input: &Param,
resolved_type: &DynSolType,
) -> Result<Field> {
if input.name.is_empty() {
return Err(anyhow!("empty param names are not supported"));
}

if fields
.iter()
.any(|f| f.name.as_str() == input.name.as_str())
{
return Err(anyhow!("duplicate param name: {}", input.name));
}

let ty = DynSolType::parse(&input.ty).context("parse solidity type")?;

if &ty != resolved_type {
return Err(anyhow!(
"Internal error: Parsed type doesn't match resolved type. This should never happen."
));
}

let dt = simple_type_to_data_type(&ty).context("convert simple type to arrow datatype")?;

Ok(Field::new(input.name.clone(), dt, true))
}

fn simple_type_to_data_type(ty: &DynSolType) -> Result<DataType> {
match ty {
DynSolType::Bool => Ok(DataType::Boolean),
Expand Down Expand Up @@ -475,6 +636,75 @@ pub fn map_batch_to_binary_view(batch: ArrowBatch) -> ArrowBatch {
}
}

pub fn filter_reverted_rows(batch: &ArrowBatch) -> Result<ArrowBatch>{
let error = batch.column::<Utf8Array<i32>>("error").context("failed to get error column")?;
// Create a mask based on the "error" column where "Reverted" values are excluded
let mask: Vec<bool> = (0..error.len())
.map(|idx| {
match error.get(idx) {
None => true, // will return true when the value is None, indicating that the row should be kept
Some(value) => value != "Reverted", //will return false when the value is Some("Reverted") indicating the row should be removed OR return true for any othe Some value
}
})
.collect();

let filtered_columns : Result<Vec<Box<dyn Array>>>= batch
.schema
.fields
.iter()
.zip(batch.chunk.columns().as_ref())
.map(|(field, col)| {
match field.data_type() {
ArrowDataType::Binary => {
let typed_column = col
.as_any()
.downcast_ref::<BinaryArray<i32>>()
.context("failed to downcast to BinaryArray")?;
let filtered = typed_column
.iter()
.zip(mask.iter())
.filter(|(_, &keep)| keep)
.map(|(value, _)| value)
.collect::<BinaryArray<i32>>();
Ok(Box::new(filtered) as Box<dyn Array>)
},
ArrowDataType::UInt64 => {
let typed_column = col
.as_any()
.downcast_ref::<UInt64Array>()
.context("failed to downcast to UInt64Array")?;
let filtered = typed_column
.iter()
.zip(mask.iter())
.filter(|(_, &keep)| keep)
.map(|(value, _)| value.copied())
.collect::<UInt64Array>();
Ok(Box::new(filtered) as Box<dyn Array>)
},
ArrowDataType::Utf8 => {
let typed_column = col
.as_any()
.downcast_ref::<Utf8Array<i32>>()
.context("failed to downcast to Utf8Array")?;
let filtered = typed_column
.iter()
.zip(mask.iter())
.filter(|(_, &keep)| keep)
.map(|(value, _)| value)
.collect::<Utf8Array<i32>>();
Ok(Box::new(filtered) as Box<dyn Array>)
},
dt=> Err(anyhow!("unsupported datatype : {:?}", dt)),
}
}).collect();

// Create new RecordBatch with filtered columns
let filtered_columns = filtered_columns.context("filter reverted columns")?;

let chunk = Arc::new(ArrowChunk::try_new(filtered_columns).context("reverted chunk")?);
Ok(ArrowBatch { chunk, schema: batch.schema.clone() })
}

#[cfg(test)]
mod tests {
use alloy_json_abi::Event;
Expand Down