Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ pub mod topics {
];

pub const STORE_SET_TOPICS: [B256; 1] = [Set::SIGNATURE_HASH];

pub const METABOARD_TOPICS: [B256; 1] = [MetaV1_2::SIGNATURE_HASH];
}
1 change: 1 addition & 0 deletions crates/cli/src/commands/local_db/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ mod tests {
dump_str: None,
block_number_threshold: 10000,
manifest_end_block: 1,
metaboard_address: None,
};
let runner_target = RunnerTarget {
orderbook_key: "test".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ mod tests {
dump_str: None,
block_number_threshold: 10000,
manifest_end_block: 1,
metaboard_address: None,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/cli/src/commands/local_db/pipeline/runner/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ mod tests {
dump_str: None,
block_number_threshold: 10000,
manifest_end_block: 1,
metaboard_address: None,
},
};

Expand Down Expand Up @@ -242,6 +243,7 @@ mod tests {
dump_str: None,
block_number_threshold: 10000,
manifest_end_block: 1,
metaboard_address: None,
},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ mod tests {
dump_str: None,
block_number_threshold: 10000,
manifest_end_block: 1,
metaboard_address: None,
},
}
}
Expand Down
15 changes: 13 additions & 2 deletions crates/cli/src/commands/local_db/pipeline/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ where
environment: RunnerEnvironment<B, W, E, T, A, S>,
) -> Result<Self, LocalDbError> {
let settings = parse_runner_settings(&settings_yaml)?;
let targets = build_runner_targets(&settings.orderbooks, &settings.syncs)?;
let targets = build_runner_targets(&settings.orderbooks, &settings.syncs, &settings.metaboards)?;

let mut target_lookup = HashMap::with_capacity(targets.len());
for target in &targets {
Expand Down Expand Up @@ -815,6 +815,17 @@ mod tests {
Ok(Vec::new())
}

async fn fetch_metaboard(
&self,
_metaboard_address: Address,
_from_block: u64,
_to_block: u64,
_cfg: &FetchConfig,
) -> Result<Vec<rain_orderbook_common::rpc_client::LogEntryResponse>, LocalDbError>
{
Ok(Vec::new())
}

async fn block_hash(&self, _block_number: u64) -> Result<B256, LocalDbError> {
Ok(B256::ZERO)
}
Expand Down Expand Up @@ -1281,7 +1292,7 @@ orderbooks:

fn build_targets(yaml: &str) -> Vec<RunnerTarget> {
let parsed = parse_settings(yaml);
build_runner_targets(&parsed.orderbooks, &parsed.syncs).expect("targets")
build_runner_targets(&parsed.orderbooks, &parsed.syncs, &parsed.metaboards).expect("targets")
}

#[test]
Expand Down
17 changes: 16 additions & 1 deletion crates/common/src/local_db/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use alloy::primitives::{Address, U256};
use alloy::rpc::types::Filter;
use futures::{StreamExt, TryStreamExt};
use rain_orderbook_bindings::topics::{ORDERBOOK_EVENT_TOPICS, STORE_SET_TOPICS};
use rain_orderbook_bindings::topics::{METABOARD_TOPICS, ORDERBOOK_EVENT_TOPICS, STORE_SET_TOPICS};
use std::collections::{HashMap, HashSet};
use thiserror::Error;

Expand Down Expand Up @@ -152,6 +152,21 @@ pub async fn fetch_store_events(
collect_logs(rpc_client, &filter, config).await
}

pub async fn fetch_metaboard_events(
rpc_client: &RpcClient,
address: Address,
from_block: u64,
to_block: u64,
config: &FetchConfig,
) -> Result<Vec<LogEntryResponse>, LocalDbError> {
let filter = Filter::new()
.address(address)
.from_block(from_block)
.to_block(to_block)
.event_signature(METABOARD_TOPICS.to_vec());
collect_logs(rpc_client, &filter, config).await
}

async fn fetch_block_timestamps(
rpc_client: &RpcClient,
block_numbers: Vec<u64>,
Expand Down
13 changes: 12 additions & 1 deletion crates/common/src/local_db/pipeline/adapters/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_trait::async_trait;
use url::Url;

use crate::local_db::decode::{decode_events, DecodedEvent, DecodedEventData};
use crate::local_db::fetch::{fetch_orderbook_events, fetch_store_events};
use crate::local_db::fetch::{fetch_metaboard_events, fetch_orderbook_events, fetch_store_events};
use crate::local_db::pipeline::EventsPipeline;
use crate::local_db::{FetchConfig, LocalDbError};
use crate::rpc_client::{LogEntryResponse, RpcClient};
Expand Down Expand Up @@ -78,6 +78,17 @@ impl EventsPipeline for DefaultEventsPipeline {
fetch_store_events(&self.rpc_client, store_addresses, from_block, to_block, cfg).await
}

async fn fetch_metaboard(
&self,
metaboard_address: Address,
from_block: u64,
to_block: u64,
cfg: &FetchConfig,
) -> Result<Vec<LogEntryResponse>, LocalDbError> {
fetch_metaboard_events(&self.rpc_client, metaboard_address, from_block, to_block, cfg)
.await
}

fn decode(
&self,
logs: &[LogEntryResponse],
Expand Down
78 changes: 76 additions & 2 deletions crates/common/src/local_db/pipeline/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct SyncInputs {
pub dump_str: Option<String>,
pub block_number_threshold: u32,
pub manifest_end_block: u64,
pub metaboard_address: Option<Address>,
}

impl<B, W, E, T, A, S> SyncEngine<B, W, E, T, A, S>
Expand Down Expand Up @@ -83,7 +84,14 @@ where
let mut all_raw_logs = orderbook_logs;
let mut decoded_events = decoded_orderbook_events;

let (store_logs, mut decoded_store_events, existing_tokens, token_addresses) = self
let (
store_logs,
mut decoded_store_events,
metaboard_logs,
mut decoded_metaboard_events,
existing_tokens,
token_addresses,
) = self
.load_store_logs_and_existing_tokens(
db,
input,
Expand All @@ -93,9 +101,16 @@ where
)
.await?;

let has_auxiliary = !store_logs.is_empty() || !metaboard_logs.is_empty();
if !store_logs.is_empty() {
all_raw_logs.extend(store_logs);
decoded_events.append(&mut decoded_store_events);
}
if !metaboard_logs.is_empty() {
all_raw_logs.extend(metaboard_logs);
decoded_events.append(&mut decoded_metaboard_events);
}
if has_auxiliary {
sort_decoded_events_by_block_and_log(&mut decoded_events);
}

Expand Down Expand Up @@ -219,6 +234,8 @@ where
decoded_events: &[DecodedEventData<DecodedEvent>],
) -> Result<
(
Vec<LogEntryResponse>,
Vec<DecodedEventData<DecodedEvent>>,
Vec<LogEntryResponse>,
Vec<DecodedEventData<DecodedEvent>>,
Vec<Erc20TokenRow>,
Expand Down Expand Up @@ -268,13 +285,39 @@ where
}
};

let (existing_tokens_res, store_pair_res) = tokio::join!(tokens_fut, store_fetch_fut);
let metaboard_fetch_fut = async {
if let Some(metaboard_address) = input.metaboard_address {
self.status.send(SyncPhase::FetchingMetaboardLogs).await?;
let logs = self
.events
.fetch_metaboard(
metaboard_address,
start_block,
target_block,
&input.cfg.fetch,
)
.await?;
self.status.send(SyncPhase::DecodingMetaboardLogs).await?;
let decoded = self.events.decode(&logs)?;
Ok::<(Vec<LogEntryResponse>, Vec<DecodedEventData<DecodedEvent>>), LocalDbError>((
logs, decoded,
))
} else {
Ok((Vec::new(), Vec::new()))
}
};

let (existing_tokens_res, store_pair_res, metaboard_pair_res) =
tokio::join!(tokens_fut, store_fetch_fut, metaboard_fetch_fut);
let existing_tokens = existing_tokens_res?;
let (store_logs, decoded_store_events) = store_pair_res?;
let (metaboard_logs, decoded_metaboard_events) = metaboard_pair_res?;

Ok((
store_logs,
decoded_store_events,
metaboard_logs,
decoded_metaboard_events,
existing_tokens,
token_addresses,
))
Expand Down Expand Up @@ -507,6 +550,7 @@ mod tests {
dump_str: None,
block_number_threshold: 10_000,
manifest_end_block: 1,
metaboard_address: None,
}
}

Expand Down Expand Up @@ -700,6 +744,8 @@ mod tests {
block_hashes: Mutex<VecDeque<Result<B256, LocalDbError>>>,
orderbook_calls: Mutex<Vec<(Address, u64, u64)>>,
store_calls: Mutex<Vec<(Vec<Address>, u64, u64)>>,
metaboard_results: Mutex<VecDeque<Result<Vec<LogEntryResponse>, LocalDbError>>>,
metaboard_calls: Mutex<Vec<(Address, u64, u64)>>,
store_barrier: Mutex<Option<Arc<Barrier>>>,
store_completed: Mutex<bool>,
}
Expand Down Expand Up @@ -740,6 +786,14 @@ mod tests {
self.inner.store_calls.lock().unwrap().clone()
}

fn push_metaboard_result(&self, result: Result<Vec<LogEntryResponse>, LocalDbError>) {
self.inner
.metaboard_results
.lock()
.unwrap()
.push_back(result);
}

fn set_store_barrier(&self, barrier: Arc<Barrier>) {
*self.inner.store_barrier.lock().unwrap() = Some(barrier);
}
Expand Down Expand Up @@ -808,6 +862,26 @@ mod tests {
.unwrap_or(Ok(Vec::new()))
}

async fn fetch_metaboard(
&self,
metaboard_address: Address,
from_block: u64,
to_block: u64,
_cfg: &FetchConfig,
) -> Result<Vec<LogEntryResponse>, LocalDbError> {
self.inner.metaboard_calls.lock().unwrap().push((
metaboard_address,
from_block,
to_block,
));
self.inner
.metaboard_results
.lock()
.unwrap()
.pop_front()
.unwrap_or(Ok(Vec::new()))
}

fn decode(
&self,
_logs: &[LogEntryResponse],
Expand Down
13 changes: 13 additions & 0 deletions crates/common/src/local_db/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub enum SyncPhase {
DecodingOrderbookLogs,
FetchingStoreLogs,
DecodingStoreLogs,
FetchingMetaboardLogs,
DecodingMetaboardLogs,
FetchingTokenMetadata,
BuildingSqlBatch,
PersistingToDatabase,
Expand All @@ -108,6 +110,8 @@ impl SyncPhase {
Self::DecodingOrderbookLogs => "Decoding orderbook logs",
Self::FetchingStoreLogs => "Fetching interpreter store logs",
Self::DecodingStoreLogs => "Decoding interpreter store logs",
Self::FetchingMetaboardLogs => "Fetching metaboard logs",
Self::DecodingMetaboardLogs => "Decoding metaboard logs",
Self::FetchingTokenMetadata => "Fetching missing token metadata",
Self::BuildingSqlBatch => "Building SQL batch",
Self::PersistingToDatabase => "Persisting to database",
Expand Down Expand Up @@ -190,6 +194,15 @@ pub trait EventsPipeline {
cfg: &FetchConfig,
) -> Result<Vec<LogEntryResponse>, LocalDbError>;

/// Fetches MetaV1_2 logs from the metaboard contract.
async fn fetch_metaboard(
&self,
metaboard_address: Address,
from_block: u64,
to_block: u64,
cfg: &FetchConfig,
) -> Result<Vec<LogEntryResponse>, LocalDbError>;

/// Decodes raw logs into typed events. Decoding must be deterministic
/// for identical input logs.
fn decode(
Expand Down
11 changes: 11 additions & 0 deletions crates/common/src/local_db/pipeline/runner/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ mod tests {
dump_str: None,
block_number_threshold: 100,
manifest_end_block: 1,
metaboard_address: None,
},
}
}
Expand Down Expand Up @@ -354,6 +355,16 @@ mod tests {
Ok(Vec::new())
}

async fn fetch_metaboard(
&self,
_metaboard_address: Address,
_from_block: u64,
_to_block: u64,
_cfg: &crate::local_db::FetchConfig,
) -> Result<Vec<LogEntryResponse>, LocalDbError> {
Ok(Vec::new())
}

async fn block_hash(&self, _block_number: u64) -> Result<B256, LocalDbError> {
Ok(b256!(
"0x0000000000000000000000000000000000000000000000000000000000000000"
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/local_db/pipeline/runner/remotes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ orderbooks:

fn sample_runner_target() -> (RunnerTarget, ManifestMap) {
let parsed = parsed_settings();
let targets = build_runner_targets(&parsed.orderbooks, &parsed.syncs).unwrap();
let targets = build_runner_targets(&parsed.orderbooks, &parsed.syncs, &parsed.metaboards).unwrap();
let target = targets
.into_iter()
.find(|t| t.orderbook_key == "ob-a")
Expand Down
Loading
Loading