diff --git a/crates/indexer/fetcher/src/json_rpc.rs b/crates/indexer/fetcher/src/json_rpc.rs index 576c4cfa..69aa9e57 100644 --- a/crates/indexer/fetcher/src/json_rpc.rs +++ b/crates/indexer/fetcher/src/json_rpc.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Debug; use std::time::Duration; @@ -93,7 +93,6 @@ impl Fetcher

{ let mut events = vec![]; let mut cursors = cursors.clone(); let mut blocks = BTreeMap::new(); - let mut block_numbers = BTreeSet::new(); let mut cursor_transactions = HashMap::new(); // Step 1: Create initial batch requests for events from all contracts @@ -141,75 +140,23 @@ impl Fetcher

{ counter!("torii_fetcher_events_fetched_total").increment(fetched_events.len() as u64); events.extend(fetched_events); - // Step 3: Collect unique block numbers from events and cursors - for event in &events { - block_numbers.insert(event.block_number.unwrap()); - } - for (_, cursor) in cursors.iter() { - if let Some(head) = cursor.head { - block_numbers.insert(head); - } - } - - // Step 4: Fetch block data (timestamps and transaction hashes) - let mut block_requests = Vec::new(); - counter!("torii_fetcher_blocks_to_fetch_total").increment(block_numbers.len() as u64); - for block_number in &block_numbers { - block_requests.push(ProviderRequestData::GetBlockWithTxHashes( - GetBlockWithTxHashesRequest { - block_id: BlockId::Number(*block_number), - }, - )); - } - - // Step 5: Execute block requests in batch and initialize blocks with transaction order - if !block_requests.is_empty() { - let block_results = self.chunked_batch_requests(&block_requests).await?; - for (block_number, result) in block_numbers.iter().zip(block_results) { - match result { - ProviderResponseData::GetBlockWithTxHashes(block) => { - let (timestamp, tx_hashes, block_hash) = match block { - MaybePreConfirmedBlockWithTxHashes::Block(block) => { - (block.timestamp, block.transactions, Some(block.block_hash)) - } - _ => unreachable!(), - }; - // Initialize block with transactions in the order provided by the block - let transactions = IndexMap::from_iter(tx_hashes.iter().map(|tx_hash| { - ( - *tx_hash, - FetchTransaction { - transaction: None, - events: vec![], - }, - ) - })); - - blocks.insert( - *block_number, - FetchRangeBlock { - block_hash, - timestamp, - transactions, - }, - ); - } - _ => unreachable!(), - } - } - } - - // Step 6: Assign events to their respective blocks and transactions + // Step 3: Assign events to their respective blocks and transactions for event in events { let block_number = event.block_number.unwrap(); - let block = blocks.get_mut(&block_number).expect("Block not found"); - - // Push the event to the transaction - block + // Create transaction entry if it doesn't exist (no ordering) + blocks + .entry(block_number) + .or_insert_with(|| FetchRangeBlock { + timestamp: 0, + transactions: IndexMap::new(), + }) .transactions - .get_mut(&event.transaction_hash) - .expect("Transaction should exist.") + .entry(event.transaction_hash) + .or_insert_with(|| FetchTransaction { + transaction: None, + events: vec![], + }) .events .push(Event { from_address: event.from_address, @@ -224,12 +171,7 @@ impl Fetcher

{ .insert(event.transaction_hash); } - // Step 7: Filter out transactions that don't have any events (not relevant to indexed contracts) - for (_, block) in blocks.iter_mut() { - block.transactions.retain(|_, tx| !tx.events.is_empty()); - } - - // Step 7: Fetch transaction details if enabled + // Step 4: Fetch transaction details if enabled if self.config.flags.contains(FetchingFlags::TRANSACTIONS) && !blocks.is_empty() { let mut transaction_requests = Vec::new(); let mut block_numbers_for_tx = Vec::new(); @@ -266,11 +208,49 @@ impl Fetcher

{ } } - // Step 8: Update cursor timestamps - for (_, cursor) in cursors.iter_mut() { + // Step 5: Update cursor timestamps - fetch only the head block timestamp + let mut head_blocks_to_fetch = HashSet::new(); + for (_, cursor) in cursors.iter() { if let Some(head) = cursor.head { - if let Some(block) = blocks.get(&head) { - cursor.last_block_timestamp = Some(block.timestamp); + head_blocks_to_fetch.insert(head); + } + } + + if !head_blocks_to_fetch.is_empty() { + let mut block_requests = Vec::new(); + for block_number in &head_blocks_to_fetch { + block_requests.push(ProviderRequestData::GetBlockWithTxHashes( + GetBlockWithTxHashesRequest { + block_id: BlockId::Number(*block_number), + }, + )); + } + + let block_results = self.chunked_batch_requests(&block_requests).await?; + for (block_number, result) in head_blocks_to_fetch.iter().zip(block_results) { + match result { + ProviderResponseData::GetBlockWithTxHashes(block) => { + let timestamp = match block { + MaybePreConfirmedBlockWithTxHashes::Block(block) => block.timestamp, + _ => unreachable!(), + }; + blocks + .entry(*block_number) + .or_insert_with(|| FetchRangeBlock { + timestamp, + transactions: IndexMap::new(), + }) + .timestamp = timestamp; + } + _ => unreachable!(), + } + } + + // Update cursor timestamps + for (_, cursor) in cursors.iter_mut() { + if let Some(head) = &cursor.head { + let timestamp = blocks.get(head).expect("Block should exist.").timestamp; + cursor.last_block_timestamp = Some(timestamp); } } } diff --git a/crates/indexer/fetcher/src/lib.rs b/crates/indexer/fetcher/src/lib.rs index fa48c0a2..6514ff70 100644 --- a/crates/indexer/fetcher/src/lib.rs +++ b/crates/indexer/fetcher/src/lib.rs @@ -45,10 +45,6 @@ impl Default for FetcherConfig { #[derive(Debug, Clone)] pub struct FetchRangeBlock { - // For pending blocks, this is None. - // We check the parent hash of the pending block to the latest block - // to see if we need to re fetch the pending block. - pub block_hash: Option, pub timestamp: u64, pub transactions: IndexMap, } diff --git a/crates/indexer/fetcher/src/test.rs b/crates/indexer/fetcher/src/test.rs index 3f342fdb..86095595 100644 --- a/crates/indexer/fetcher/src/test.rs +++ b/crates/indexer/fetcher/src/test.rs @@ -491,8 +491,7 @@ async fn test_range_one_block() { // Expecting the block right after the cursor head + the chunk size. assert_eq!(result.range.blocks.len(), 2); - assert_eq!(torii_block.block_hash, Some(expected.block_hash)); - assert_eq!(torii_block.timestamp, expected.timestamp); + assert_eq!(torii_block.transactions.len(), expected.transactions.len()); // Verify all transactions are present and match for (torii_tx_hash, _torii_tx) in torii_block.transactions.iter() {