Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 57 additions & 77 deletions crates/indexer/fetcher/src/json_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug;
use std::time::Duration;

Expand Down Expand Up @@ -93,7 +93,6 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Fetcher<P> {
let mut events = vec![];
let mut cursors = cursors.clone();
let mut blocks = BTreeMap::new();
let mut block_numbers = BTreeSet::new();
let mut cursor_transactions = HashMap::new();

// Step 1: Create initial batch requests for events from all contracts
Expand Down Expand Up @@ -141,75 +140,23 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Fetcher<P> {
counter!("torii_fetcher_events_fetched_total").increment(fetched_events.len() as u64);
events.extend(fetched_events);

// Step 3: Collect unique block numbers from events and cursors
for event in &events {
block_numbers.insert(event.block_number.unwrap());
}
for (_, cursor) in cursors.iter() {
if let Some(head) = cursor.head {
block_numbers.insert(head);
}
}

// Step 4: Fetch block data (timestamps and transaction hashes)
let mut block_requests = Vec::new();
counter!("torii_fetcher_blocks_to_fetch_total").increment(block_numbers.len() as u64);
for block_number in &block_numbers {
block_requests.push(ProviderRequestData::GetBlockWithTxHashes(
GetBlockWithTxHashesRequest {
block_id: BlockId::Number(*block_number),
},
));
}

// Step 5: Execute block requests in batch and initialize blocks with transaction order
if !block_requests.is_empty() {
let block_results = self.chunked_batch_requests(&block_requests).await?;
for (block_number, result) in block_numbers.iter().zip(block_results) {
match result {
ProviderResponseData::GetBlockWithTxHashes(block) => {
let (timestamp, tx_hashes, block_hash) = match block {
MaybePreConfirmedBlockWithTxHashes::Block(block) => {
(block.timestamp, block.transactions, Some(block.block_hash))
}
_ => unreachable!(),
};
// Initialize block with transactions in the order provided by the block
let transactions = IndexMap::from_iter(tx_hashes.iter().map(|tx_hash| {
(
*tx_hash,
FetchTransaction {
transaction: None,
events: vec![],
},
)
}));

blocks.insert(
*block_number,
FetchRangeBlock {
block_hash,
timestamp,
transactions,
},
);
}
_ => unreachable!(),
}
}
}

// Step 6: Assign events to their respective blocks and transactions
// Step 3: Assign events to their respective blocks and transactions
for event in events {
let block_number = event.block_number.unwrap();

let block = blocks.get_mut(&block_number).expect("Block not found");

// Push the event to the transaction
block
// Create transaction entry if it doesn't exist (no ordering)
blocks
.entry(block_number)
.or_insert_with(|| FetchRangeBlock {
timestamp: 0,
transactions: IndexMap::new(),
})
.transactions
.get_mut(&event.transaction_hash)
.expect("Transaction should exist.")
.entry(event.transaction_hash)
.or_insert_with(|| FetchTransaction {
transaction: None,
events: vec![],
})
.events
.push(Event {
from_address: event.from_address,
Expand All @@ -224,12 +171,7 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Fetcher<P> {
.insert(event.transaction_hash);
}

// Step 7: Filter out transactions that don't have any events (not relevant to indexed contracts)
for (_, block) in blocks.iter_mut() {
block.transactions.retain(|_, tx| !tx.events.is_empty());
}

// Step 7: Fetch transaction details if enabled
// Step 4: Fetch transaction details if enabled
if self.config.flags.contains(FetchingFlags::TRANSACTIONS) && !blocks.is_empty() {
let mut transaction_requests = Vec::new();
let mut block_numbers_for_tx = Vec::new();
Expand Down Expand Up @@ -266,11 +208,49 @@ impl<P: Provider + Send + Sync + Clone + std::fmt::Debug + 'static> Fetcher<P> {
}
}

// Step 8: Update cursor timestamps
for (_, cursor) in cursors.iter_mut() {
// Step 5: Update cursor timestamps - fetch only the head block timestamp
let mut head_blocks_to_fetch = HashSet::new();
for (_, cursor) in cursors.iter() {
if let Some(head) = cursor.head {
if let Some(block) = blocks.get(&head) {
cursor.last_block_timestamp = Some(block.timestamp);
head_blocks_to_fetch.insert(head);
}
}

if !head_blocks_to_fetch.is_empty() {
let mut block_requests = Vec::new();
for block_number in &head_blocks_to_fetch {
block_requests.push(ProviderRequestData::GetBlockWithTxHashes(
GetBlockWithTxHashesRequest {
block_id: BlockId::Number(*block_number),
},
));
}

let block_results = self.chunked_batch_requests(&block_requests).await?;
for (block_number, result) in head_blocks_to_fetch.iter().zip(block_results) {
match result {
ProviderResponseData::GetBlockWithTxHashes(block) => {
let timestamp = match block {
MaybePreConfirmedBlockWithTxHashes::Block(block) => block.timestamp,
_ => unreachable!(),
};
blocks
.entry(*block_number)
.or_insert_with(|| FetchRangeBlock {
timestamp,
transactions: IndexMap::new(),
})
.timestamp = timestamp;
}
_ => unreachable!(),
}
}

// Update cursor timestamps
for (_, cursor) in cursors.iter_mut() {
if let Some(head) = &cursor.head {
let timestamp = blocks.get(head).expect("Block should exist.").timestamp;
cursor.last_block_timestamp = Some(timestamp);
}
}
}
Expand Down
4 changes: 0 additions & 4 deletions crates/indexer/fetcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ impl Default for FetcherConfig {

#[derive(Debug, Clone)]
pub struct FetchRangeBlock {
// For pending blocks, this is None.
// We check the parent hash of the pending block to the latest block
// to see if we need to re fetch the pending block.
pub block_hash: Option<Felt>,
pub timestamp: u64,
pub transactions: IndexMap<Felt, FetchTransaction>,
}
Expand Down
3 changes: 1 addition & 2 deletions crates/indexer/fetcher/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,7 @@ async fn test_range_one_block() {

// Expecting the block right after the cursor head + the chunk size.
assert_eq!(result.range.blocks.len(), 2);
assert_eq!(torii_block.block_hash, Some(expected.block_hash));
assert_eq!(torii_block.timestamp, expected.timestamp);
assert_eq!(torii_block.transactions.len(), expected.transactions.len());

// Verify all transactions are present and match
for (torii_tx_hash, _torii_tx) in torii_block.transactions.iter() {
Expand Down
Loading