diff --git a/.github/workflows/custom-release.yml b/.github/workflows/custom-release.yml index 90893eaf5..80451d8e6 100644 --- a/.github/workflows/custom-release.yml +++ b/.github/workflows/custom-release.yml @@ -1,10 +1,265 @@ -# Due to workflow limitation of github, this file is merely here -# to support triggering the workflow on a dev branch, without having -# to merge it into main. -# -# It is useful to make releases which publish a container to the github container registry. -# CAUTION: it is recommended that you always enforce those custom releases to contain a preview tag, with `-custom.X` suffix where -# custom is a descriptive name for the release and X is the version number. -# -# gh workflow run custom-release.yml --ref feat/your-branch -f preview=v1.7.0-custom.1 -# \ No newline at end of file +name: manual release for Optimistic Katana + +on: + workflow_dispatch: + inputs: + preview: + description: "Preview tag for Optimistic Katana. Must be in format vX.Y.Z-optimistic.X" + type: string + required: true + +env: + RUST_VERSION: 1.86.0 + CARGO_TERM_COLOR: always + REGISTRY_IMAGE: ghcr.io/${{ github.repository }} + +jobs: + prepare: + runs-on: ubuntu-latest + outputs: + tag_name: ${{ steps.release_info.outputs.tag_name }} + steps: + - uses: actions/checkout@v4 + - name: Get version + id: release_info + run: | + # Validate that the tag follows the vX.Y.Z-optimistic.X format + if [[ ! "${{ inputs.preview }}" =~ ^v[0-9]+\.[0-9]+\.[0-9]+-optimistic\.[0-9]+$ ]]; then + echo "Invalid tag format. Must be in format vX.Y.Z-optimistic.X (e.g., v1.0.0-optimistic.1)" + exit 1 + fi + echo "tag_name=${{ inputs.preview }}" >> $GITHUB_OUTPUT + + build-contracts: + runs-on: ubuntu-latest + needs: prepare + container: + image: ghcr.io/dojoengine/katana-dev:latest + steps: + - uses: actions/checkout@v4 + + - name: Build contracts + run: make contracts + + - name: Upload contract artifacts + uses: actions/upload-artifact@v4 + with: + name: contract-artifacts + path: ./crates/contracts/build + retention-days: 1 + + release: + name: ${{ matrix.job.target }} (${{ matrix.job.os }}${{ matrix.job.native_build == true && ', native' || '' }}) + needs: [prepare, build-contracts] + runs-on: ${{ matrix.job.os }} + env: + PLATFORM_NAME: ${{ matrix.job.platform }} + TARGET: ${{ matrix.job.target }} + ARCH: ${{ matrix.job.arch }} + NATIVE_BUILD: ${{ matrix.job.native_build }} + strategy: + matrix: + job: + # The OS is used for the runner + # The platform is a generic platform name + # The target is used by Cargo + # The arch is either 386, arm64 or amd64 + # The svm target platform to use for the binary https://github.com/roynalnaruto/svm-rs/blob/84cbe0ac705becabdc13168bae28a45ad2299749/svm-builds/build.rs#L4-L24 + # Added native_build dimension to control build type + - os: ubuntu-latest-8-cores + platform: linux + target: x86_64-unknown-linux-gnu + arch: amd64 + native_build: false + - os: ubuntu-latest-8-cores-arm64 + platform: linux + target: aarch64-unknown-linux-gnu + arch: arm64 + svm_target_platform: linux-aarch64 + native_build: false + - os: macos-latest-xlarge + platform: darwin + target: aarch64-apple-darwin + arch: arm64 + native_build: false + + steps: + - uses: actions/checkout@v4 + + - name: Download contract artifacts + uses: actions/download-artifact@v4 + with: + name: contract-artifacts + path: ./crates/contracts/build + + - uses: actions-rust-lang/setup-rust-toolchain@v1 + name: Rust Toolchain Setup + with: + toolchain: ${{ env.RUST_VERSION }} + target: ${{ matrix.job.target }} + cache-on-failure: true + cache-key: ${{ matrix.job.target }} + + - uses: oven-sh/setup-bun@v2 + with: + bun-version: latest + + - name: Install LLVM ( Linux ) + if: ${{ matrix.job.platform == 'linux' && matrix.job.native_build == true }} + run: | + wget https://apt.llvm.org/llvm.sh + chmod +x llvm.sh + sudo ./llvm.sh 19 + sudo apt-get update -y + sudo apt-get install -y g++ llvm-19 llvm-19-dev llvm-19-runtime clang-19 clang-tools-19 lld-19 libpolly-19-dev libmlir-19-dev mlir-19-tools + echo "MLIR_SYS_190_PREFIX=/usr/lib/llvm-19" >> $GITHUB_ENV + echo "LLVM_SYS_191_PREFIX=/usr/lib/llvm-19" >> $GITHUB_ENV + echo "TABLEGEN_190_PREFIX=/usr/lib/llvm-19" >> $GITHUB_ENV + + - name: Install LLVM ( macOS ) + if: ${{ matrix.job.platform == 'darwin' && matrix.job.native_build == true }} + run: | + brew install llvm@19 --quiet + brew install zstd + echo "MLIR_SYS_190_PREFIX=$(brew --prefix llvm@19)" >> $GITHUB_ENV + echo "LLVM_SYS_191_PREFIX=$(brew --prefix llvm@19)" >> $GITHUB_ENV + echo "TABLEGEN_190_PREFIX=$(brew --prefix llvm@19)" >> $GITHUB_ENV + echo "LIBRARY_PATH=$(brew --prefix zstd)/lib:$LIBRARY_PATH" >> $GITHUB_ENV + echo "CPATH=$(brew --prefix zstd)/include:$CPATH" >> $GITHUB_ENV + + # - name: Install LLVM ( Windows ) + # if: ${{ matrix.job.platform == 'win32' && matrix.job.native_build == true }} + # run: | + # $llvmUrl = "https://github.com/llvm/llvm-project/releases/download/llvmorg-19.1.7/clang+llvm-19.1.7-x86_64-pc-windows-msvc.tar.xz" + # $llvmDir = "C:\Program Files (x86)\LLVM" + # $llvmDirBin = "C:\Program Files (x86)\LLVM\bin" + # + # Write-Host "Downloading LLVM from $llvmUrl" + # Invoke-WebRequest -Uri $llvmUrl -OutFile llvm.tar.xz + # + # Write-Host "Creating LLVM directory" + # New-Item -ItemType Directory -Path $llvmDir -Force + # + # Write-Host "Extracting LLVM" + # tar -xf llvm.tar.xz -C $llvmDir --strip-components=1 + # + # Write-Host "LLVM installed successfully to $llvmDir" + # + # Write-Host "Listing files in LLVM directory" + # Get-ChildItem -Path "$llvmDirBin" | ForEach-Object { Write-Host $_.Name } + + # # On Windows, use powershell syntax to write the env var to the file. + # # https://github.com/actions/runner/issues/1636#issuecomment-1024531638 + # - name: Set cairo-native LLVM environment variables ( Windows ) + # if: ${{ matrix.job.platform == 'win32' && matrix.job.native_build == true }} + # run: | + # echo "MLIR_SYS_190_PREFIX=C:\Program Files (x86)\LLVM" | Out-File -FilePath $env:GITHUB_ENV -Append + # echo "LLVM_SYS_191_PREFIX=C:\Program Files (x86)\LLVM" | Out-File -FilePath $env:GITHUB_ENV -Append + # echo "TABLEGEN_190_PREFIX=C:\Program Files (x86)\LLVM" | Out-File -FilePath $env:GITHUB_ENV -Append + + - name: Apple M1 setup + if: ${{ matrix.job.target == 'aarch64-apple-darwin' }} + run: | + echo "SDKROOT=$(xcrun -sdk macosx --show-sdk-path)" >> $GITHUB_ENV + echo "MACOSX_DEPLOYMENT_TARGET=$(xcrun -sdk macosx --show-sdk-platform-version)" >> $GITHUB_ENV + + - name: Linux ARM setup + if: ${{ matrix.job.target == 'aarch64-unknown-linux-gnu' }} + run: | + sudo apt-get update -y + sudo apt-get install -y gcc-aarch64-linux-gnu libssl-dev + # We build jemalloc with 64KB pagesize so that it works for all linux/arm64 pagesize variants + # See: https://github.com/jemalloc/jemalloc/issues/467 + echo "JEMALLOC_SYS_WITH_LG_PAGE=16" >> $GITHUB_ENV + + - name: Build binary + if: ${{ matrix.job.native_build == false }} + shell: bash + run: | + cargo build --bin katana --profile performance --target ${{ matrix.job.target }} + + - name: Build binary ( w/ cairo-native ) + if: ${{ matrix.job.native_build == true }} + shell: bash + run: | + cargo build --bin katana --profile performance --features native --target ${{ matrix.job.target }} + + - name: Archive binaries + id: artifacts + env: + VERSION_NAME: ${{ needs.prepare.outputs.tag_name }} + run: | + if [ "$NATIVE_BUILD" == "true" ]; then + SUFFIX="_native" + else + SUFFIX="" + fi + + if [ "$PLATFORM_NAME" == "linux" ]; then + tar -czvf "katana_${VERSION_NAME}_${PLATFORM_NAME}_${ARCH}${SUFFIX}.tar.gz" -C ./target/${TARGET}/performance katana + echo "file_name=katana_${VERSION_NAME}_${PLATFORM_NAME}_${ARCH}${SUFFIX}.tar.gz" >> $GITHUB_OUTPUT + elif [ "$PLATFORM_NAME" == "darwin" ]; then + # We need to use gtar here otherwise the archive is corrupt. + # See: https://github.com/actions/virtual-environments/issues/2619 + gtar -czvf "katana_${VERSION_NAME}_${PLATFORM_NAME}_${ARCH}${SUFFIX}.tar.gz" -C ./target/${TARGET}/performance katana + echo "file_name=katana_${VERSION_NAME}_${PLATFORM_NAME}_${ARCH}${SUFFIX}.tar.gz" >> $GITHUB_OUTPUT + fi + shell: bash + + # We move binaries so they match $TARGETPLATFORM in the Docker build + # Only move native binaries for Docker (we want the native version for Docker) + - name: Move binaries for Docker + if: ${{ env.PLATFORM_NAME == 'linux' }} + shell: bash + run: | + mkdir -p $PLATFORM_NAME/$ARCH + mv target/${TARGET}/performance/katana $PLATFORM_NAME/$ARCH + + - name: Upload Docker binaries + if: ${{ env.PLATFORM_NAME == 'linux' }} + uses: actions/upload-artifact@v4 + with: + name: binaries-${{ matrix.job.target }} + path: ${{ env.PLATFORM_NAME }} + retention-days: 1 + + - name: Upload release artifacts + uses: actions/upload-artifact@v4 + with: + name: artifacts-${{ matrix.job.target }}${{ matrix.job.native_build == true && '-native' || '' }} + path: ${{ steps.artifacts.outputs.file_name }} + retention-days: 1 + + docker-build-and-push: + runs-on: ubuntu-latest-8-cores + needs: [prepare, release] + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Download binaries + uses: actions/download-artifact@v4 + with: + pattern: binaries-* + path: artifacts/linux + merge-multiple: true + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push docker image + uses: docker/build-push-action@v3 + with: + push: true + tags: ghcr.io/${{ github.repository }}:${{ needs.prepare.outputs.tag_name }} + platforms: linux/amd64,linux/arm64 + build-contexts: | + artifacts=artifacts \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c773f1103..4fad73d4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2901,7 +2901,7 @@ dependencies = [ "serde_json", "syn 2.0.104", "tempfile", - "toml 0.9.5", + "toml 0.9.6", ] [[package]] @@ -6140,6 +6140,7 @@ dependencies = [ "katana-gateway-types", "katana-messaging", "katana-metrics", + "katana-optimistic", "katana-pipeline", "katana-pool", "katana-pool-api", @@ -6181,6 +6182,28 @@ dependencies = [ "url", ] +[[package]] +name = "katana-optimistic" +version = "1.7.0" +dependencies = [ + "anyhow", + "futures", + "katana-core", + "katana-db", + "katana-executor", + "katana-gateway-client", + "katana-pool", + "katana-pool-api", + "katana-primitives", + "katana-provider", + "katana-rpc-client", + "katana-rpc-types", + "katana-tasks", + "parking_lot", + "tokio", + "tracing", +] + [[package]] name = "katana-pipeline" version = "1.7.0" @@ -6225,6 +6248,7 @@ dependencies = [ "futures-util", "katana-primitives", "parking_lot", + "starknet-types-core 0.2.3", "thiserror 1.0.69", "tokio", ] @@ -6383,6 +6407,7 @@ dependencies = [ "katana-messaging", "katana-metrics", "katana-node", + "katana-optimistic", "katana-pool", "katana-primitives", "katana-provider", @@ -9674,11 +9699,11 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "1.0.0" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40734c41988f7306bb04f0ecf60ec0f3f1caa34290e4e8ea471dcd3346483b83" +checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" dependencies = [ - "serde", + "serde_core", ] [[package]] @@ -9738,7 +9763,7 @@ dependencies = [ [[package]] name = "sev-snp" version = "0.3.0" -source = "git+https://github.com/automata-network/amd-sev-snp-attestation-sdk?branch=main#07162a4dd8d692af68484084b972f8b9b286359b" +source = "git+https://github.com/automata-network/amd-sev-snp-attestation-sdk?branch=main#ce1bf49f7d6a457df55894c21182accf081c1dd4" dependencies = [ "asn1-rs", "bincode 1.3.3", @@ -10894,14 +10919,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75129e1dc5000bfbaa9fee9d1b21f974f9fbad9daec557a521ee6e080825f6e8" +checksum = "ae2a4cf385da23d1d53bc15cdfa5c2109e93d8d362393c801e87da2f72f0e201" dependencies = [ "indexmap 2.10.0", - "serde", - "serde_spanned 1.0.0", - "toml_datetime 0.7.0", + "serde_core", + "serde_spanned 1.0.4", + "toml_datetime 0.7.5+spec-1.1.0", "toml_parser", "toml_writer", "winnow", @@ -10918,11 +10943,11 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.0" +version = "0.7.5+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bade1c3e902f58d73d3f294cd7f20391c1cb2fbcb643b73566bc773971df91e3" +checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" dependencies = [ - "serde", + "serde_core", ] [[package]] @@ -10956,9 +10981,9 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "toml_writer" -version = "1.0.4" +version = "1.0.6+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" [[package]] name = "tonic" diff --git a/Cargo.toml b/Cargo.toml index bbd09c141..9cf3b1c0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ members = [ "tests/db-compat", "tests/reverse-proxy", # "tests/snos", + "crates/optimistic", ] [workspace.package] @@ -75,6 +76,7 @@ serde-utils = { path = "crates/serde-utils" } katana-chain-spec = { path = "crates/chain-spec" } katana-cli = { path = "crates/cli" } +katana-optimistic = { path = "crates/optimistic" } katana-codecs = { path = "crates/storage/codecs" } katana-codecs-derive = { path = "crates/storage/codecs/derive" } katana-contracts = { path = "crates/contracts" } diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index cb017de55..f919a2cb8 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -6,6 +6,7 @@ use clap::{Args, Subcommand}; pub mod args; pub mod file; pub mod full; +pub mod optimistic; pub mod options; pub mod utils; @@ -13,6 +14,7 @@ pub use args::SequencerNodeArgs; pub use options::*; use crate::full::FullNodeArgs; +use crate::optimistic::OptimisticNodeArgs; #[derive(Debug, Args, PartialEq)] pub struct NodeCli { @@ -27,6 +29,9 @@ pub enum NodeSubcommand { #[command(about = "Launch a sequencer node")] Sequencer(Box), + + #[command(about = "Launch an optimistic node")] + Optimistic(Box), } impl NodeCli { @@ -34,6 +39,7 @@ impl NodeCli { match self.command { NodeSubcommand::Full(args) => args.execute().await, NodeSubcommand::Sequencer(args) => args.with_config_file()?.execute().await, + NodeSubcommand::Optimistic(args) => args.execute().await, } } } diff --git a/crates/cli/src/optimistic.rs b/crates/cli/src/optimistic.rs new file mode 100644 index 000000000..269724c3d --- /dev/null +++ b/crates/cli/src/optimistic.rs @@ -0,0 +1,149 @@ +use std::sync::Arc; + +use anyhow::Result; +pub use clap::Parser; +use katana_chain_spec::ChainSpec; +use katana_primitives::chain::ChainId; +use serde::{Deserialize, Serialize}; +use tracing::info; +use url::Url; + +use crate::options::*; + +pub(crate) const LOG_TARGET: &str = "katana::cli::optimistic"; + +#[derive(Parser, Debug, Serialize, Deserialize, Clone, PartialEq)] +#[command(next_help_heading = "Optimistic node options")] +pub struct OptimisticNodeArgs { + /// Don't print anything on startup. + #[arg(long)] + #[serde(default)] + pub silent: bool, + + /// The Starknet RPC provider to fork from. + #[arg(long, value_name = "URL", alias = "rpc-url")] + #[arg(help = "The Starknet RPC provider to fork from.")] + pub fork_provider_url: Url, + + #[command(flatten)] + #[serde(default)] + pub logging: LoggingOptions, + + #[command(flatten)] + #[serde(default)] + pub tracer: TracerOptions, + + #[cfg(feature = "server")] + #[command(flatten)] + #[serde(default)] + pub metrics: MetricsOptions, + + #[cfg(feature = "server")] + #[command(flatten)] + #[serde(default)] + pub server: ServerOptions, +} + +impl OptimisticNodeArgs { + pub async fn execute(&self) -> Result<()> { + let logging = katana_tracing::LoggingConfig { + stdout_format: self.logging.stdout.stdout_format, + stdout_color: self.logging.stdout.color, + file_enabled: self.logging.file.enabled, + file_format: self.logging.file.file_format, + file_directory: self.logging.file.directory.clone(), + file_max_files: self.logging.file.max_files, + }; + + let tracer_config = self.tracer_config(); + katana_tracing::init(logging, tracer_config).await?; + self.start_node().await + } + + async fn start_node(&self) -> Result<()> { + let config = self.config()?; + + #[cfg(feature = "server")] + let rpc_addr = config.rpc.socket_addr(); + + if !self.silent { + info!(target: LOG_TARGET, "Starting optimistic node..."); + } + + let node = katana_node::optimistic::Node::build(config).await?; + let _handle = node.launch().await?; + + #[cfg(feature = "server")] + { + info!(target: LOG_TARGET, %rpc_addr, "JSON-RPC server started."); + } + + // Wait indefinitely + tokio::signal::ctrl_c().await?; + + Ok(()) + } + + fn config(&self) -> Result { + let chain = self.chain_spec()?; + let rpc = self.rpc_config()?; + let forking = self.forking_config(); + let metrics = self.metrics_config()?; + Ok(katana_node::optimistic::config::Config { chain, rpc, forking, metrics }) + } + + fn tracer_config(&self) -> Option { + self.tracer.config() + } + + #[allow(clippy::field_reassign_with_default)] + fn chain_spec(&self) -> Result> { + let mut dev_chain_spec = katana_chain_spec::dev::ChainSpec::default(); + dev_chain_spec.id = ChainId::SEPOLIA; + Ok(Arc::new(ChainSpec::Dev(dev_chain_spec))) + } + + fn forking_config(&self) -> katana_node::optimistic::config::ForkingConfig { + use katana_node::optimistic::config::ForkingConfig; + ForkingConfig { url: self.fork_provider_url.clone(), block: None } + } + + fn rpc_config(&self) -> Result { + use katana_node::optimistic::config::{RpcConfig, RpcModuleKind, RpcModulesList}; + #[cfg(feature = "server")] + { + let mut apis = RpcModulesList::new(); + apis.add(RpcModuleKind::Starknet); + + Ok(RpcConfig { + addr: self.server.http_addr, + port: self.server.http_port, + apis, + max_connections: self.server.max_connections, + cors_origins: self.server.http_cors_origins.clone(), + ..Default::default() + }) + } + + #[cfg(not(feature = "server"))] + Ok(RpcConfig::default()) + } + + fn metrics_config(&self) -> Result> { + use katana_node::optimistic::config::MetricsConfig; + #[cfg(feature = "server")] + { + if self.metrics.metrics { + Ok(Some(MetricsConfig { + addr: self.metrics.metrics_addr, + port: self.metrics.metrics_port, + })) + } else { + Ok(None) + } + } + + #[cfg(not(feature = "server"))] + Ok(None) + } +} diff --git a/crates/cli/src/options.rs b/crates/cli/src/options.rs index e36fd0cc0..a3fb75dc9 100644 --- a/crates/cli/src/options.rs +++ b/crates/cli/src/options.rs @@ -27,7 +27,7 @@ use katana_node::config::rpc::{RpcModulesList, DEFAULT_RPC_MAX_PROOF_KEYS}; use katana_node::config::rpc::{ DEFAULT_RPC_ADDR, DEFAULT_RPC_MAX_CALL_GAS, DEFAULT_RPC_MAX_EVENT_PAGE_SIZE, DEFAULT_RPC_PORT, }; -use katana_primitives::block::{BlockHashOrNumber, GasPrice}; +use katana_primitives::block::{BlockIdOrTag, GasPrice}; use katana_primitives::chain::ChainId; #[cfg(feature = "server")] use katana_rpc_server::cors::HeaderValue; @@ -38,7 +38,7 @@ use url::Url; #[cfg(feature = "server")] use crate::utils::{deserialize_cors_origins, serialize_cors_origins}; -use crate::utils::{parse_block_hash_or_number, parse_genesis}; +use crate::utils::{parse_block_id_or_tag, parse_genesis}; const DEFAULT_DEV_SEED: &str = "0"; const DEFAULT_DEV_ACCOUNTS: u16 = 10; @@ -425,11 +425,11 @@ pub struct ForkingOptions { #[arg(long = "fork.provider", value_name = "URL", conflicts_with = "genesis")] pub fork_provider: Option, - /// Fork the network at a specific block id, can either be a hash (0x-prefixed) or a block - /// number. + /// Fork the network at a specific block id, can either be a hash (0x-prefixed), a block + /// number, or a tag (latest, l1accepted, preconfirmed). #[arg(long = "fork.block", value_name = "BLOCK", requires = "fork_provider")] - #[arg(value_parser = parse_block_hash_or_number)] - pub fork_block: Option, + #[arg(value_parser = parse_block_id_or_tag)] + pub fork_block: Option, } #[derive(Debug, Args, Clone, Serialize, Deserialize, Default, PartialEq)] diff --git a/crates/cli/src/utils.rs b/crates/cli/src/utils.rs index b51db2000..d9e24e74d 100644 --- a/crates/cli/src/utils.rs +++ b/crates/cli/src/utils.rs @@ -10,7 +10,7 @@ use katana_genesis::constant::{ }; use katana_genesis::json::GenesisJson; use katana_genesis::Genesis; -use katana_primitives::block::{BlockHash, BlockHashOrNumber, BlockNumber}; +use katana_primitives::block::{BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber}; use katana_primitives::cairo::ShortString; use katana_primitives::chain::ChainId; use katana_primitives::class::ClassHash; @@ -52,6 +52,27 @@ pub fn parse_block_hash_or_number(value: &str) -> Result { } } +/// Parse a block id or tag from a string. Accepts: +/// - Block hashes (0x-prefixed) +/// - Block numbers (numeric) +/// - Block tags: "latest", "l1accepted", "preconfirmed" (case-insensitive) +pub fn parse_block_id_or_tag(value: &str) -> Result { + if value.starts_with("0x") { + Ok(BlockIdOrTag::Hash(BlockHash::from_hex(value)?)) + } else { + match value.to_lowercase().as_str() { + "latest" => Ok(BlockIdOrTag::Latest), + "l1accepted" => Ok(BlockIdOrTag::L1Accepted), + "preconfirmed" => Ok(BlockIdOrTag::PreConfirmed), + _ => { + let num = + value.parse::().context("could not parse block number or tag")?; + Ok(BlockIdOrTag::Number(num)) + } + } + } +} + pub fn print_intro(args: &SequencerNodeArgs, chain: &ChainSpec) { let mut accounts = chain.genesis().accounts().peekable(); let account_class_hash = accounts.peek().map(|e| e.1.class_hash()); diff --git a/crates/executor/src/implementation/blockifier/utils.rs b/crates/executor/src/implementation/blockifier/utils.rs index 11b0fbda0..0564777b3 100644 --- a/crates/executor/src/implementation/blockifier/utils.rs +++ b/crates/executor/src/implementation/blockifier/utils.rs @@ -479,7 +479,7 @@ pub fn block_context_from_envs( block_timestamp: BlockTimestamp(block_env.timestamp), sequencer_address: to_blk_address(block_env.sequencer_address), gas_prices, - use_kzg_da: false, + use_kzg_da: true, }; let chain_info = diff --git a/crates/gateway/gateway-types/src/transaction.rs b/crates/gateway/gateway-types/src/transaction.rs index bcc993aaf..4208518a4 100644 --- a/crates/gateway/gateway-types/src/transaction.rs +++ b/crates/gateway/gateway-types/src/transaction.rs @@ -563,6 +563,94 @@ fn serialize_resource_bounds_mapping( feeder_bounds.serialize(serializer) } +// Custom serialization for contract class with gzip + base64 encoded sierra program +fn _serialize_contract_class( + class: &std::sync::Arc, + serializer: S, +) -> Result { + use std::io::Write; + + use base64::Engine; + use flate2::write::GzEncoder; + use flate2::Compression; + use serde::ser::SerializeStruct; + + let mut state = serializer.serialize_struct("GatewaySierraClass", 4)?; + + // Convert sierra_program (Vec) to JSON array, then gzip compress, then base64 encode + let program_json = + serde_json::to_string(&class.sierra_program).map_err(serde::ser::Error::custom)?; + + // Gzip compress the JSON + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder + .write_all(program_json.as_bytes()) + .map_err(|e| serde::ser::Error::custom(format!("gzip compression failed: {e}")))?; + let compressed_bytes = encoder + .finish() + .map_err(|e| serde::ser::Error::custom(format!("gzip finish failed: {e}")))?; + + // Base64 encode + let program_base64 = base64::engine::general_purpose::STANDARD.encode(&compressed_bytes); + + state.serialize_field("sierra_program", &program_base64)?; + state.serialize_field("contract_class_version", &class.contract_class_version)?; + state.serialize_field("entry_points_by_type", &class.entry_points_by_type)?; + + // Serialize ABI - it's already in pythonic JSON format via SierraClassAbi's Serialize impl + if let Some(abi) = class.abi.as_ref() { + state.serialize_field("abi", abi)?; + } else { + state.serialize_field("abi", "")?; + } + + state.end() +} + +fn _deserialize_contract_class<'de, D: serde::Deserializer<'de>>( + deserializer: D, +) -> Result, D::Error> { + use std::io::Read; + + use base64::Engine; + use flate2::read::GzDecoder; + use serde::de; + + #[allow(dead_code)] + #[derive(Deserialize)] + struct GatewaySierraClass { + sierra_program: String, + contract_class_version: String, + entry_points_by_type: cairo_lang_starknet_classes::contract_class::ContractEntryPoints, + abi: String, + } + + let gateway_class = GatewaySierraClass::deserialize(deserializer)?; + + // Base64 decode + let compressed_bytes = base64::engine::general_purpose::STANDARD + .decode(&gateway_class.sierra_program) + .map_err(|e| de::Error::custom(format!("failed to decode base64 sierra_program: {e}")))?; + + // Gzip decompress + let mut decoder = GzDecoder::new(&compressed_bytes[..]); + let mut decompressed = String::new(); + decoder + .read_to_string(&mut decompressed) + .map_err(|e| de::Error::custom(format!("failed to decompress sierra_program: {e}")))?; + + // Parse JSON array to Vec + let sierra_program: Vec = serde_json::from_str(&decompressed) + .map_err(|e| de::Error::custom(format!("failed to parse sierra_program JSON: {e}")))?; + + Ok(std::sync::Arc::new(katana_rpc_types::class::RpcSierraContractClass { + sierra_program, + abi: Some(gateway_class.abi), + entry_points_by_type: gateway_class.entry_points_by_type, + contract_class_version: gateway_class.contract_class_version, + })) +} + //////////////////////////////////////////////////////////////////////////////// // Conversion to katana-primitives types //////////////////////////////////////////////////////////////////////////////// @@ -740,10 +828,6 @@ impl From for katana_primitives::da::DataAvailabilityMode } } -//////////////////////////////////////////////////////////////////////////////// -// Conversion to katana-rpc-types types -//////////////////////////////////////////////////////////////////////////////// - impl From for katana_rpc_types::RpcTxWithHash { fn from(value: ConfirmedTransaction) -> Self { Self { transaction_hash: value.transaction_hash, transaction: value.transaction.into() } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 2d85c1a64..3c20aeeec 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -18,6 +18,7 @@ katana-messaging.workspace = true katana-metrics.workspace = true katana-pipeline.workspace = true katana-pool.workspace = true +katana-optimistic.workspace = true katana-pool-api.workspace = true katana-primitives.workspace = true katana-provider.workspace = true @@ -34,9 +35,9 @@ backon.workspace = true num-traits.workspace = true futures.workspace = true http.workspace = true +parking_lot.workspace = true jsonrpsee.workspace = true serde.workspace = true -parking_lot.workspace = true thiserror.workspace = true tracing.workspace = true url.workspace = true diff --git a/crates/node/src/config/fork.rs b/crates/node/src/config/fork.rs index 02a93c6f7..1be1e7d64 100644 --- a/crates/node/src/config/fork.rs +++ b/crates/node/src/config/fork.rs @@ -1,4 +1,4 @@ -use katana_primitives::block::BlockHashOrNumber; +use katana_primitives::block::BlockIdOrTag; use url::Url; /// Node forking configurations. @@ -6,6 +6,6 @@ use url::Url; pub struct ForkingConfig { /// The JSON-RPC URL of the network to fork from. pub url: Url, - /// The block number to fork from. If `None`, the latest block will be used. - pub block: Option, + /// The block id or tag to fork from. If `None`, the latest block will be used. + pub block: Option, } diff --git a/crates/node/src/full/mod.rs b/crates/node/src/full/mod.rs index 21e8d4a0a..68375266b 100644 --- a/crates/node/src/full/mod.rs +++ b/crates/node/src/full/mod.rs @@ -181,6 +181,7 @@ impl Node { task_spawner.clone(), preconf_factory, GasPriceOracle::create_for_testing(), + None, // optimistic_state starknet_api_cfg, storage_provider.clone(), ); diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index fd1bd06e9..2d53063da 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -1,6 +1,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] pub mod full; +pub mod optimistic; pub mod config; pub mod exit; @@ -29,7 +30,7 @@ use katana_metrics::sys::DiskReporter; use katana_metrics::{MetricsServer, MetricsServerHandle, Report}; use katana_pool::ordering::FiFo; use katana_pool::TxPool; -use katana_primitives::block::{BlockHashOrNumber, GasPrices}; +use katana_primitives::block::{BlockIdOrTag, GasPrices}; use katana_primitives::cairo::ShortString; use katana_primitives::env::VersionedConstantsOverrides; use katana_provider::{DbProviderFactory, ForkProviderFactory, ProviderFactory}; @@ -253,6 +254,7 @@ where task_spawner.clone(), block_producer.clone(), gas_oracle.clone(), + None, // optimistic_state starknet_api_cfg, provider.clone(), ); @@ -415,7 +417,7 @@ impl Node { id } else { let res = client.block_number().await?; - BlockHashOrNumber::Num(res.block_number) + BlockIdOrTag::Number(res.block_number) }; // if the id is not in ASCII encoding, we display the chain id as is in hex. diff --git a/crates/node/src/optimistic/config.rs b/crates/node/src/optimistic/config.rs new file mode 100644 index 000000000..bf417b642 --- /dev/null +++ b/crates/node/src/optimistic/config.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; + +use katana_chain_spec::ChainSpec; + +pub use crate::config::db::DbConfig; +pub use crate::config::execution::ExecutionConfig; +pub use crate::config::fork::ForkingConfig; +pub use crate::config::metrics::MetricsConfig; +pub use crate::config::rpc::{RpcConfig, RpcModuleKind, RpcModulesList}; + +/// Node configurations. +/// +/// List of all possible options that can be used to configure a node. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Config { + /// The chain specification. + pub chain: Arc, + + /// Forking options. + pub forking: ForkingConfig, + + /// Rpc options. + pub rpc: RpcConfig, + + /// Metrics options. + pub metrics: Option, +} diff --git a/crates/node/src/optimistic/mod.rs b/crates/node/src/optimistic/mod.rs new file mode 100644 index 000000000..6dc253e56 --- /dev/null +++ b/crates/node/src/optimistic/mod.rs @@ -0,0 +1,279 @@ +use std::sync::Arc; + +use anyhow::Result; +use http::header::CONTENT_TYPE; +use http::Method; +use jsonrpsee::RpcModule; +use katana_core::backend::storage::ProviderRO; +use katana_core::backend::Backend; +use katana_core::env::BlockContextGenerator; +use katana_executor::implementation::blockifier::cache::ClassCache; +use katana_executor::implementation::blockifier::BlockifierFactory; +use katana_executor::{BlockLimits, ExecutionFlags}; +use katana_gas_price_oracle::GasPriceOracle; +use katana_metrics::exporters::prometheus::PrometheusRecorder; +use katana_metrics::sys::DiskReporter; +use katana_metrics::{MetricsServer, Report}; +use katana_optimistic::executor::{OptimisticExecutor, OptimisticState}; +use katana_optimistic::pool::{PoolValidator, TxPool}; +use katana_pool::ordering::FiFo; +use katana_provider::ProviderFactory; +use katana_rpc_api::starknet::{StarknetApiServer, StarknetTraceApiServer, StarknetWriteApiServer}; +use katana_rpc_client::starknet::Client as StarknetClient; +use katana_rpc_server::cors::Cors; +use katana_rpc_server::starknet::{OptimisticPendingBlockProvider, StarknetApi, StarknetApiConfig}; +use katana_rpc_server::{RpcServer, RpcServerHandle}; +use katana_tasks::{JoinHandle, TaskManager}; +use tracing::info; + +pub mod config; + +use config::Config; + +// pub use self::config::*; +use crate::config::rpc::RpcModuleKind; + +#[derive(Debug)] +pub struct Node

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + config: Arc, + pool: TxPool, + db: katana_db::Db, + rpc_server: RpcServer, + task_manager: TaskManager, + executor: OptimisticExecutor

, + backend: Arc>, +} + +impl

Node

+where + P: ProviderFactory + Clone + Unpin, + P::Provider: ProviderRO, +{ + pub async fn launch(self) -> Result> { + let chain = self.backend.chain_spec.id(); + info!(%chain, "Starting node."); + + // TODO: maybe move this to the build stage + if let Some(ref cfg) = self.config.metrics { + let db_metrics = Box::new(self.db.clone()) as Box; + let disk_metrics = Box::new(DiskReporter::new(self.db.path())?) as Box; + let reports: Vec> = vec![db_metrics, disk_metrics]; + + let exporter = PrometheusRecorder::current().expect("qed; should exist at this point"); + let server = MetricsServer::new(exporter).with_process_metrics().reports(reports); + + let addr = cfg.socket_addr(); + let _metrics_handle = server.start(addr)?; + info!(%addr, "Metrics server started."); + } + + // --- start the rpc server + + let rpc_handle = self.rpc_server.start(self.config.rpc.socket_addr()).await?; + + // --- start the gas oracle worker task + + if let Some(worker) = self.backend.gas_oracle.run_worker() { + self.task_manager + .task_spawner() + .build_task() + .graceful_shutdown() + .name("gas oracle") + .spawn(worker); + } + + info!(target: "node", "Gas price oracle worker started."); + + let executor_handle = self.executor.spawn(); + + Ok(LaunchedNode { + rpc: rpc_handle, + backend: self.backend, + config: self.config, + db: self.db, + executor: executor_handle, + task_manager: self.task_manager, + pool: self.pool, + rpc_server: self.rpc_server, + }) + } +} + +impl Node { + pub async fn build(config: Config) -> Result { + if config.metrics.is_some() { + // Metrics recorder must be initialized before calling any of the metrics macros, in + // order for it to be registered. + let _ = PrometheusRecorder::install("katana")?; + } + + // -- build task manager + + let task_manager = TaskManager::current(); + let task_spawner = task_manager.task_spawner(); + + // --- build executor factory + + let executor_factory = { + #[allow(unused_mut)] + let mut class_cache = ClassCache::builder(); + + // Ignore native for now in optimistic node. + // #[cfg(feature = "native")] + // { + // info!(enabled = config.execution.compile_native, "Cairo native compilation"); + // class_cache = class_cache.compile_native(config.execution.compile_native); + // } + + let global_class_cache = class_cache.build_global()?; + + let factory = BlockifierFactory::new( + None, + ExecutionFlags::new(), + BlockLimits::default(), + global_class_cache, + config.chain.clone(), + ); + + Arc::new(factory) + }; + + // --- build storage provider + + let starknet_client = StarknetClient::new(config.forking.url.clone()); + + let db = katana_db::Db::in_memory()?; + + // Get the latest block number from the forked network + let forked_block_num = starknet_client.block_number().await?.block_number; + + let provider = katana_provider::ForkProviderFactory::new( + db.clone(), + forked_block_num, + starknet_client.clone(), + ); + + let gpo = GasPriceOracle::sampled_starknet(config.forking.url.clone()); + let block_context_generator = BlockContextGenerator::default().into(); + + let backend = Arc::new(Backend { + gas_oracle: gpo.clone(), + storage: provider.clone(), + executor_factory: executor_factory.clone(), + block_context_generator, + chain_spec: config.chain.clone(), + }); + + // --- build transaction pool + + let pool_validator = PoolValidator::new(starknet_client.clone()); + let pool = TxPool::new(pool_validator, FiFo::new()); + + // -- build executor + + let optimistic_state = OptimisticState::new(); + + // this is the component that will populate the optimistic state + let executor = OptimisticExecutor::new( + pool.clone(), + provider.clone(), + optimistic_state.clone(), + executor_factory.clone(), + task_spawner.clone(), + starknet_client.clone(), + Default::default(), + ); + + // --- build rpc server + + let mut rpc_modules = RpcModule::new(()); + + let cors = Cors::new() + .allow_origins(config.rpc.cors_origins.clone()) + // Allow `POST` when accessing the resource + .allow_methods([Method::POST, Method::GET]) + .allow_headers([CONTENT_TYPE, "argent-client".parse().unwrap(), "argent-version".parse().unwrap()]); + + // --- build starknet api + + let starknet_api_cfg = StarknetApiConfig { + max_event_page_size: config.rpc.max_event_page_size, + max_proof_keys: config.rpc.max_proof_keys, + max_call_gas: config.rpc.max_call_gas, + max_concurrent_estimate_fee_requests: config.rpc.max_concurrent_estimate_fee_requests, + simulation_flags: ExecutionFlags::new(), + versioned_constant_overrides: None, + #[cfg(feature = "cartridge")] + paymaster: None, + }; + + // Create the optimistic pending block provider + let pending_block_provider = OptimisticPendingBlockProvider::new( + optimistic_state.clone(), + starknet_client.clone(), + provider.clone(), + ); + + let starknet_api = StarknetApi::new( + config.chain.clone(), + pool.clone(), + task_spawner.clone(), + pending_block_provider, + gpo.clone(), + Some(optimistic_state.clone()), + starknet_api_cfg, + provider.clone(), + ); + + if config.rpc.apis.contains(&RpcModuleKind::Starknet) { + rpc_modules.merge(StarknetApiServer::into_rpc(starknet_api.clone()))?; + rpc_modules.merge(StarknetWriteApiServer::into_rpc(starknet_api.clone()))?; + rpc_modules.merge(StarknetTraceApiServer::into_rpc(starknet_api.clone()))?; + } + + #[allow(unused_mut)] + let mut rpc_server = + RpcServer::new().metrics(true).health_check(true).cors(cors).module(rpc_modules)?; + + if let Some(timeout) = config.rpc.timeout { + rpc_server = rpc_server.timeout(timeout); + }; + + if let Some(max_connections) = config.rpc.max_connections { + rpc_server = rpc_server.max_connections(max_connections); + } + + if let Some(max_request_body_size) = config.rpc.max_request_body_size { + rpc_server = rpc_server.max_request_body_size(max_request_body_size); + } + + if let Some(max_response_body_size) = config.rpc.max_response_body_size { + rpc_server = rpc_server.max_response_body_size(max_response_body_size); + } + + info!("Build complete."); + + Ok(Node { db, pool, backend, rpc_server, config: config.into(), task_manager, executor }) + } +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct LaunchedNode

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + config: Arc, + pool: TxPool, + db: katana_db::Db, + rpc_server: RpcServer, + task_manager: TaskManager, + backend: Arc>, + rpc: RpcServerHandle, + executor: JoinHandle<()>, +} diff --git a/crates/optimistic/Cargo.toml b/crates/optimistic/Cargo.toml new file mode 100644 index 000000000..137df1f24 --- /dev/null +++ b/crates/optimistic/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "katana-optimistic" +edition.workspace = true +license.workspace = true +repository.workspace = true +version.workspace = true + +[dependencies] +futures.workspace = true +anyhow.workspace = true +katana-core.workspace = true +katana-executor.workspace = true +katana-pool.workspace = true +katana-gateway-client.workspace = true +katana-primitives.workspace = true +katana-provider.workspace = true +katana-rpc-types.workspace = true +katana-rpc-client.workspace = true +katana-tasks.workspace = true +katana-pool-api.workspace = true +parking_lot.workspace = true +tracing.workspace = true +katana-db.workspace = true +tokio.workspace = true diff --git a/crates/optimistic/src/executor.rs b/crates/optimistic/src/executor.rs new file mode 100644 index 000000000..b8ae4c73e --- /dev/null +++ b/crates/optimistic/src/executor.rs @@ -0,0 +1,440 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::stream::StreamExt; +use futures::FutureExt; +use katana_core::backend::storage::ProviderRO; +use katana_executor::implementation::blockifier::BlockifierFactory; +use katana_executor::{ExecutionResult, ExecutorFactory}; +use katana_pool::ordering::FiFo; +use katana_pool::{PendingTransactions, PoolTransaction, TransactionPool}; +use katana_primitives::block::{BlockIdOrTag, GasPrices}; +use katana_primitives::env::BlockEnv; +use katana_primitives::transaction::TxWithHash; +use katana_primitives::version::StarknetVersion; +use katana_provider::api::state::{StateFactoryProvider, StateProvider}; +use katana_provider::providers::db::cached::{CachedStateProvider, SharedStateCache}; +use katana_provider::ProviderFactory; +use katana_rpc_client::starknet::Client; +use katana_rpc_types::block::GetBlockWithTxHashesResponse; +use katana_rpc_types::BroadcastedTxWithChainId; +use katana_tasks::{CpuBlockingJoinHandle, JoinHandle, Result as TaskResult, TaskSpawner}; +use parking_lot::RwLock; +use tokio::time::sleep; +use tracing::{debug, error, info, trace}; + +use crate::pool::TxPool; + +const LOG_TARGET: &str = "optimistic"; + +#[derive(Debug, Clone)] +pub struct OptimisticState { + pub state: SharedStateCache, + pub transactions: Arc>>, +} + +impl OptimisticState { + pub fn new() -> Self { + Self { state: SharedStateCache::default(), transactions: Arc::new(RwLock::new(Vec::new())) } + } + + pub fn get_optimistic_state(&self, base: Box) -> Box { + Box::new(CachedStateProvider::new(base, self.state.clone())) + } +} + +impl Default for OptimisticState { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug)] +pub struct OptimisticExecutor

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + pool: TxPool, + optimistic_state: OptimisticState, + executor_factory: Arc, + storage: P, + task_spawner: TaskSpawner, + client: Client, + block_env: Arc>, +} + +impl

OptimisticExecutor

+where + P: ProviderFactory + Clone + Unpin, + P::Provider: ProviderRO, +{ + /// Creates a new `OptimisticExecutor` instance. + /// + /// # Arguments + /// + /// * `pool` - The transaction pool to monitor for new transactions + /// * `storage` - The storage provider factory + /// * `optimistic_state` - The optimistic state to track executed transactions + /// * `executor_factory` - The executor factory for transaction execution + /// * `task_spawner` - The task spawner used to run the executor actor + /// * `client` - The RPC client used to poll for confirmed blocks + /// * `block_env` - The initial block environment + pub fn new( + pool: TxPool, + storage: P, + optimistic_state: OptimisticState, + executor_factory: Arc, + task_spawner: TaskSpawner, + client: Client, + block_env: BlockEnv, + ) -> Self { + Self { + pool, + optimistic_state, + executor_factory, + task_spawner, + storage, + client, + block_env: Arc::new(RwLock::new(block_env)), + } + } + + /// Spawns the optimistic executor actor task. + /// + /// This method creates a subscription to the pool's pending transactions and spawns + /// an async task that continuously processes incoming transactions. + /// + /// # Returns + /// + /// A `JoinHandle` to the spawned executor task. + pub fn spawn(self) -> JoinHandle<()> { + // Spawn the transaction execution task + let executor_handle = self.task_spawner.build_task().name("Optimistic Executor").spawn( + OptimisticExecutorActor::new( + self.pool, + self.storage.clone(), + self.optimistic_state.clone(), + self.executor_factory, + self.task_spawner.clone(), + self.block_env.clone(), + ), + ); + + // Spawn the block polling task + let client = self.client; + let optimistic_state = self.optimistic_state; + let block_env = self.block_env; + self.task_spawner.build_task().name("Block Polling").spawn(async move { + Self::poll_confirmed_blocks(client, optimistic_state, block_env).await; + }); + + executor_handle + } + + /// Polls for confirmed blocks every 2 seconds and removes transactions from the optimistic + /// state when they appear in confirmed blocks. Also updates the block environment. + async fn poll_confirmed_blocks( + client: Client, + optimistic_state: OptimisticState, + block_env: Arc>, + ) { + let mut last_block_number = None; + + loop { + sleep(Duration::from_secs(5)).await; + + match client.get_block_with_tx_hashes(BlockIdOrTag::Latest).await { + Ok(block_response) => { + let (block_number, block_tx_hashes, new_block_env) = match &block_response { + GetBlockWithTxHashesResponse::Block(block) => { + let env = BlockEnv { + number: block.block_number, + timestamp: block.timestamp, + l2_gas_prices: GasPrices { + eth: block.l2_gas_price.price_in_wei.try_into().unwrap(), + strk: block.l2_gas_price.price_in_fri.try_into().unwrap(), + }, + l1_gas_prices: GasPrices { + eth: block.l1_gas_price.price_in_wei.try_into().unwrap(), + strk: block.l1_gas_price.price_in_fri.try_into().unwrap(), + }, + l1_data_gas_prices: GasPrices { + eth: block.l1_data_gas_price.price_in_wei.try_into().unwrap(), + strk: block.l1_data_gas_price.price_in_fri.try_into().unwrap(), + }, + sequencer_address: block.sequencer_address, + starknet_version: StarknetVersion::parse(&block.starknet_version) + .unwrap_or_default(), + }; + (block.block_number, block.transactions.clone(), env) + } + GetBlockWithTxHashesResponse::PreConfirmed(block) => { + let env = BlockEnv { + number: block.block_number, + timestamp: block.timestamp, + l2_gas_prices: GasPrices { + eth: block.l2_gas_price.price_in_wei.try_into().unwrap(), + strk: block.l2_gas_price.price_in_fri.try_into().unwrap(), + }, + l1_gas_prices: GasPrices { + eth: block.l1_gas_price.price_in_wei.try_into().unwrap(), + strk: block.l1_gas_price.price_in_fri.try_into().unwrap(), + }, + l1_data_gas_prices: GasPrices { + eth: block.l1_data_gas_price.price_in_wei.try_into().unwrap(), + strk: block.l1_data_gas_price.price_in_fri.try_into().unwrap(), + }, + sequencer_address: block.sequencer_address, + starknet_version: StarknetVersion::parse(&block.starknet_version) + .unwrap_or_default(), + }; + (block.block_number, block.transactions.clone(), env) + } + }; + + // Check if this is a new block + if let Some(last_num) = last_block_number { + if block_number <= last_num { + // Same block, skip processing + continue; + } + } + + // Update the last seen block number + last_block_number = Some(block_number); + debug!(target: LOG_TARGET, %block_number, "New block received."); + + // Update the block environment for the next optimistic execution + *block_env.write() = new_block_env; + trace!(target: LOG_TARGET, block_number, "Updated block environment"); + + if block_tx_hashes.is_empty() { + continue; + } + + // Get the current optimistic transactions + let mut optimistic_txs = optimistic_state.transactions.write(); + + // Filter out transactions that are confirmed in this block + let initial_count = optimistic_txs.len(); + optimistic_txs.retain(|(tx, _)| !block_tx_hashes.contains(&tx.hash)); + + let removed_count = initial_count - optimistic_txs.len(); + if removed_count > 0 { + debug!( + target: LOG_TARGET, + block_number = block_number, + removed_count = removed_count, + remaining_count = optimistic_txs.len(), + "Removed confirmed transactions from optimistic state" + ); + } + } + Err(e) => { + error!( + target: LOG_TARGET, + error = %e, + "Error polling for confirmed blocks" + ); + } + } + } + } +} + +struct OptimisticExecutorActor

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + pool: TxPool, + optimistic_state: OptimisticState, + pending_txs: PendingTransactions>, + storage: P, + executor_factory: Arc, + task_spawner: TaskSpawner, + ongoing_execution: Option>>, + block_env: Arc>, +} + +impl

std::fmt::Debug for OptimisticExecutorActor

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OptimisticExecutorActor").finish_non_exhaustive() + } +} + +impl

OptimisticExecutorActor

+where + P: ProviderFactory + Clone, + P::Provider: ProviderRO, +{ + /// Creates a new executor actor with the given pending transactions stream. + fn new( + pool: TxPool, + storage: P, + optimistic_state: OptimisticState, + executor_factory: Arc, + task_spawner: TaskSpawner, + block_env: Arc>, + ) -> Self { + let pending_txs = pool.pending_transactions(); + Self { + pool, + optimistic_state, + pending_txs, + storage, + executor_factory, + task_spawner, + ongoing_execution: None, + block_env, + } + } + + /// Execute a single transaction optimistically against the latest state. + fn execute_transaction( + storage: P, + optimistic_state: OptimisticState, + executor_factory: Arc, + block_env: Arc>, + tx: BroadcastedTxWithChainId, + pool: TxPool, + ) -> anyhow::Result<()> { + let latest_state = storage.provider().latest()?; + let state = optimistic_state.get_optimistic_state(latest_state); + + // Get the current block environment + let current_block_env = block_env.read().clone(); + + let mut executor = executor_factory.with_state_and_block_env(state, current_block_env); + + // Execute the transaction + let tx_hash = tx.hash(); + + let _ = executor.execute_transactions(vec![tx.into()]).unwrap(); + + let output = executor.take_execution_output().unwrap(); + optimistic_state.state.merge_state_updates(&output.states); + + // Add the executed transactions to the optimistic state + for (tx, result) in output.transactions { + optimistic_state.transactions.write().push((tx, result)); + } + + pool.remove_transactions(&[tx_hash]); + + Ok(()) + } +} + +impl

Future for OptimisticExecutorActor

+where + P: ProviderFactory + Clone + Unpin, + P::Provider: ProviderRO, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + // First, poll any ongoing execution to completion before processing new transactions + if let Some(mut execution) = this.ongoing_execution.take() { + match execution.poll_unpin(cx) { + Poll::Ready(result) => { + match result { + TaskResult::Ok(Ok(())) => { + // Execution completed successfully, continue to next transaction + trace!(target: LOG_TARGET, "Transaction execution completed successfully"); + } + TaskResult::Ok(Err(e)) => { + error!( + target: LOG_TARGET, + error = %e, + "Error executing transaction" + ); + } + TaskResult::Err(e) => { + if e.is_cancelled() { + error!(target: LOG_TARGET, "Transaction execution task cancelled"); + } else { + std::panic::resume_unwind(e.into_panic()); + } + } + } + // Continue to process next transaction + } + Poll::Pending => { + // Execution is still ongoing, restore it and yield + this.ongoing_execution = Some(execution); + return Poll::Pending; + } + } + } + + // Process new transactions from the stream + match this.pending_txs.poll_next_unpin(cx) { + Poll::Ready(Some(pending_tx)) => { + let tx = pending_tx.tx.as_ref().clone(); + + let tx_hash = tx.hash(); + let tx_sender = tx.sender(); + let tx_nonce = tx.nonce(); + + trace!( + target: LOG_TARGET, + tx_hash = format!("{:#x}", tx_hash), + sender = %tx_sender, + nonce = %tx_nonce, + "Received transaction from pool" + ); + + // Spawn the transaction execution on the blocking CPU pool + let pool = this.pool.clone(); + let storage = this.storage.clone(); + let optimistic_state = this.optimistic_state.clone(); + let executor_factory = this.executor_factory.clone(); + let block_env = this.block_env.clone(); + + let execution_future = this.task_spawner.cpu_bound().spawn(move || { + Self::execute_transaction( + storage, + optimistic_state, + executor_factory, + block_env, + tx, + pool, + ) + }); + + this.ongoing_execution = Some(execution_future); + + // Wake the task to poll the execution immediately + cx.waker().wake_by_ref(); + + // Continue the loop to poll the execution + continue; + } + + Poll::Ready(None) => { + // Stream has ended (pool was dropped) + info!(target: LOG_TARGET, "Transaction stream ended"); + return Poll::Ready(()); + } + + Poll::Pending => { + // Stream is exhausted - no more transactions available right now. + // Yield control back to the executor until we're polled again. + return Poll::Pending; + } + } + } + } +} diff --git a/crates/optimistic/src/lib.rs b/crates/optimistic/src/lib.rs new file mode 100644 index 000000000..4e194790b --- /dev/null +++ b/crates/optimistic/src/lib.rs @@ -0,0 +1,2 @@ +pub mod executor; +pub mod pool; diff --git a/crates/optimistic/src/pool.rs b/crates/optimistic/src/pool.rs new file mode 100644 index 000000000..c7b2dcf7b --- /dev/null +++ b/crates/optimistic/src/pool.rs @@ -0,0 +1,73 @@ +use std::sync::Arc; + +use katana_pool::ordering::FiFo; +use katana_pool::pool::Pool; +use katana_pool_api::validation::{ + Error as ValidationError, InvalidTransactionError, ValidationOutcome, Validator, +}; +use katana_rpc_client::starknet::Client; +use katana_rpc_types::{BroadcastedTx, BroadcastedTxWithChainId}; +use tracing::info; + +pub type TxPool = Pool>; + +/// A validator that forwards transactions to a remote Starknet RPC endpoint. +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct PoolValidator { + client: Arc, + gateway_client: katana_gateway_client::Client, +} + +impl PoolValidator { + pub fn new(client: Client) -> Self { + Self { client: Arc::new(client), gateway_client: katana_gateway_client::Client::sepolia() } + } + + pub fn new_shared(client: Arc) -> Self { + Self { client, gateway_client: katana_gateway_client::Client::sepolia() } + } +} + +impl Validator for PoolValidator { + type Transaction = BroadcastedTxWithChainId; + + async fn validate( + &self, + tx: Self::Transaction, + ) -> Result, ValidationError> { + // Forward the transaction to the remote node + let result = match &tx.tx { + BroadcastedTx::Invoke(invoke_tx) => { + let gateway_tx = invoke_tx.clone().into(); + self.gateway_client.add_invoke_transaction(gateway_tx).await.map(|_| ()) + } + BroadcastedTx::Declare(declare_tx) => { + let gateway_tx = declare_tx.clone().into(); + self.gateway_client.add_declare_transaction(gateway_tx).await.map(|_| ()) + } + BroadcastedTx::DeployAccount(deploy_account_tx) => { + let gateway_tx = deploy_account_tx.clone().into(); + self.gateway_client.add_deploy_account_transaction(gateway_tx).await.map(|_| ()) + } + }; + + match result { + Ok(_) => Ok(ValidationOutcome::Valid(tx)), + Err(err) => { + info!(error = ?err, "Gateway validation failure."); + let error = InvalidTransactionError::ValidationFailure { + address: match &tx.tx { + BroadcastedTx::Invoke(tx) => tx.sender_address, + BroadcastedTx::Declare(tx) => tx.sender_address, + BroadcastedTx::DeployAccount(tx) => tx.contract_address(), + }, + class_hash: Default::default(), + error: err.to_string(), + }; + + Ok(ValidationOutcome::Invalid { tx, error }) + } + } + } +} diff --git a/crates/pool/pool-api/Cargo.toml b/crates/pool/pool-api/Cargo.toml index a82ab82fc..1edba7d19 100644 --- a/crates/pool/pool-api/Cargo.toml +++ b/crates/pool/pool-api/Cargo.toml @@ -9,6 +9,7 @@ katana-primitives.workspace = true futures.workspace = true parking_lot.workspace = true +starknet-types-core.workspace = true thiserror.workspace = true tokio = { workspace = true, features = [ "sync" ] } diff --git a/crates/pool/pool-api/src/ordering.rs b/crates/pool/pool-api/src/ordering.rs index e43aa013e..3a399d9fb 100644 --- a/crates/pool/pool-api/src/ordering.rs +++ b/crates/pool/pool-api/src/ordering.rs @@ -2,7 +2,7 @@ use crate::PoolTransaction; // evaluates the priority of a transaction which would be used to determine how txs are ordered in // the pool. -pub trait PoolOrd { +pub trait PoolOrd: Send + Sync { type Transaction: PoolTransaction; /// The priority value type whose [Ord] implementation is used to order the transaction in the /// pool. diff --git a/crates/pool/pool-api/src/validation.rs b/crates/pool/pool-api/src/validation.rs index b9a0ccbef..b9f8db762 100644 --- a/crates/pool/pool-api/src/validation.rs +++ b/crates/pool/pool-api/src/validation.rs @@ -165,7 +165,7 @@ impl Error { pub type ValidationResult = Result, Error>; /// A trait for validating transactions before they are added to the transaction pool. -pub trait Validator { +pub trait Validator: Send + Sync { type Transaction: PoolTransaction; /// Validate a transaction. diff --git a/crates/pool/pool/src/pool.rs b/crates/pool/pool/src/pool.rs index 682856bf4..8d5fd23de 100644 --- a/crates/pool/pool/src/pool.rs +++ b/crates/pool/pool/src/pool.rs @@ -160,7 +160,7 @@ where // TODO: create a small cache for rejected transactions to respect the rpc spec // `getTransactionStatus` ValidationOutcome::Invalid { error, .. } => { - warn!(target: "pool", %error, "Invalid transaction."); + warn!(target: "pool", ?error, "Invalid transaction."); Err(PoolError::InvalidTransaction(Box::new(error))) } diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index 7c03b756f..445f24ae4 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -18,7 +18,7 @@ pub type BlockNumber = u64; /// Block hash type. pub type BlockHash = Felt; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum BlockIdOrTag { Hash(BlockHash), Number(BlockNumber), @@ -39,6 +39,18 @@ impl From for BlockIdOrTag { } } +impl std::fmt::Display for BlockIdOrTag { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BlockIdOrTag::Number(num) => write!(f, "{num}"), + BlockIdOrTag::Hash(hash) => write!(f, "{hash:#x}"), + BlockIdOrTag::L1Accepted => write!(f, "L1Accepted"), + BlockIdOrTag::Latest => write!(f, "Latest"), + BlockIdOrTag::PreConfirmed => write!(f, "PreConfirmed"), + } + } +} + /// Block identifier that refers to a confirmed block. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ConfirmedBlockIdOrTag { diff --git a/crates/primitives/src/event.rs b/crates/primitives/src/event.rs index b19cd77f8..e25181a5b 100644 --- a/crates/primitives/src/event.rs +++ b/crates/primitives/src/event.rs @@ -1,6 +1,9 @@ use core::fmt; use std::num::ParseIntError; +use crate::transaction::TxHash; +use crate::Felt; + /// Represents a continuation token for implementing paging in event queries. /// /// This struct stores the necessary information to resume fetching events @@ -17,6 +20,8 @@ pub struct ContinuationToken { pub txn_n: u64, /// The event number within the transaction to continue from. pub event_n: u64, + /// The transaction hash to continue from. Used for optimistic transactions. + pub transaction_hash: Option, } #[derive(PartialEq, Eq, Debug, thiserror::Error)] @@ -30,7 +35,7 @@ pub enum ContinuationTokenError { impl ContinuationToken { pub fn parse(token: &str) -> Result { let arr: Vec<&str> = token.split(',').collect(); - if arr.len() != 3 { + if arr.len() != 3 && arr.len() != 4 { return Err(ContinuationTokenError::InvalidToken); } let block_n = @@ -40,13 +45,29 @@ impl ContinuationToken { let event_n = u64::from_str_radix(arr[2], 16).map_err(ContinuationTokenError::ParseFailed)?; - Ok(ContinuationToken { block_n, txn_n: receipt_n, event_n }) + // Parse optional transaction hash (4th field) + let transaction_hash = if arr.len() == 4 { + let hash_str = arr[3]; + if hash_str.is_empty() { + None + } else { + Some(Felt::from_hex(hash_str).map_err(|_| ContinuationTokenError::InvalidToken)?) + } + } else { + None + }; + + Ok(ContinuationToken { block_n, txn_n: receipt_n, event_n, transaction_hash }) } } impl fmt::Display for ContinuationToken { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:x},{:x},{:x}", self.block_n, self.txn_n, self.event_n) + if let Some(tx_hash) = &self.transaction_hash { + write!(f, "{:x},{:x},{:x},{:#x}", self.block_n, self.txn_n, self.event_n, tx_hash) + } else { + write!(f, "{:x},{:x},{:x}", self.block_n, self.txn_n, self.event_n) + } } } @@ -108,11 +129,17 @@ mod test { #[test] fn to_string_works() { fn helper(block_n: u64, txn_n: u64, event_n: u64) -> String { - ContinuationToken { block_n, txn_n, event_n }.to_string() + ContinuationToken { block_n, txn_n, event_n, transaction_hash: None }.to_string() } assert_eq!(helper(0, 0, 0), "0,0,0"); assert_eq!(helper(30, 255, 4), "1e,ff,4"); + + // Test with transaction hash + let tx_hash = Felt::from_hex("0x123abc").unwrap(); + let token = + ContinuationToken { block_n: 0, txn_n: 0, event_n: 0, transaction_hash: Some(tx_hash) }; + assert_eq!(token.to_string(), "0,0,0,0x123abc"); } #[test] @@ -120,8 +147,22 @@ mod test { fn helper(token: &str) -> ContinuationToken { ContinuationToken::parse(token).unwrap() } - assert_eq!(helper("0,0,0"), ContinuationToken { block_n: 0, txn_n: 0, event_n: 0 }); - assert_eq!(helper("1e,ff,4"), ContinuationToken { block_n: 30, txn_n: 255, event_n: 4 }); + assert_eq!( + helper("0,0,0"), + ContinuationToken { block_n: 0, txn_n: 0, event_n: 0, transaction_hash: None } + ); + assert_eq!( + helper("1e,ff,4"), + ContinuationToken { block_n: 30, txn_n: 255, event_n: 4, transaction_hash: None } + ); + + // Test parsing with transaction hash + let tx_hash = Felt::from_hex("0x123abc").unwrap(); + let token = helper("0,0,0,0x123abc"); + assert_eq!( + token, + ContinuationToken { block_n: 0, txn_n: 0, event_n: 0, transaction_hash: Some(tx_hash) } + ); } #[test] @@ -170,6 +211,17 @@ mod test { assert_eq!(t.block_n, 30); assert_eq!(t.txn_n, 255); assert_eq!(t.event_n, 4); + assert_eq!(t.transaction_hash, None); + }); + + // Test with transaction hash + let regular_token_with_hash = "1e,ff,4,0x123abc"; + let parsed = MaybeForkedContinuationToken::parse(regular_token_with_hash).unwrap(); + assert_matches!(parsed, MaybeForkedContinuationToken::Token(t) => { + assert_eq!(t.block_n, 30); + assert_eq!(t.txn_n, 255); + assert_eq!(t.event_n, 4); + assert_eq!(t.transaction_hash, Some(Felt::from_hex("0x123abc").unwrap())); }); } } diff --git a/crates/rpc/rpc-server/Cargo.toml b/crates/rpc/rpc-server/Cargo.toml index ed833a8ea..622ee49af 100644 --- a/crates/rpc/rpc-server/Cargo.toml +++ b/crates/rpc/rpc-server/Cargo.toml @@ -7,6 +7,7 @@ repository.workspace = true version.workspace = true [dependencies] +katana-optimistic.workspace = true katana-core.workspace = true katana-chain-spec.workspace = true katana-executor.workspace = true @@ -17,6 +18,7 @@ katana-primitives.workspace = true katana-genesis = { workspace = true, optional = true } katana-provider = { workspace = true, features = [ "test-utils" ] } katana-rpc-api = { workspace = true, features = [ "client" ] } +katana-rpc-client.workspace = true katana-rpc-types.workspace = true katana-rpc-types-builder.workspace = true katana-tasks.workspace = true @@ -25,12 +27,12 @@ katana-tracing.workspace = true anyhow.workspace = true auto_impl.workspace = true +futures.workspace = true http.workspace = true jsonrpsee = { workspace = true, features = [ "client", "server" ] } metrics.workspace = true serde_json.workspace = true starknet = { workspace = true, optional = true } -futures = { workspace = true, optional = true } thiserror.workspace = true tokio.workspace = true tower.workspace = true @@ -81,7 +83,6 @@ url.workspace = true [features] cartridge = [ "dep:cainome", - "dep:futures", "dep:cartridge", "dep:katana-genesis", "dep:starknet", diff --git a/crates/rpc/rpc-server/src/logger.rs b/crates/rpc/rpc-server/src/logger.rs index 04a530b8e..af217293c 100644 --- a/crates/rpc/rpc-server/src/logger.rs +++ b/crates/rpc/rpc-server/src/logger.rs @@ -3,7 +3,7 @@ use std::future::Future; use jsonrpsee::core::middleware; use jsonrpsee::core::middleware::{Batch, Notification}; use jsonrpsee::types::Request; -use tracing::Instrument; +use tracing::{info, Instrument}; /// RPC logger layer. #[derive(Copy, Clone, Debug)] @@ -41,12 +41,14 @@ where #[inline] #[tracing::instrument(target = "rpc", level = "trace", name = "rpc_call", skip_all, fields(method = req.method_name()))] fn call<'a>(&self, req: Request<'a>) -> impl Future + Send + 'a { + info!(method = %req.method, "Rpc called."); self.service.call(req).in_current_span() } #[inline] #[tracing::instrument(target = "rpc", level = "trace", name = "rpc_batch", skip_all, fields(batch_size = batch.len()) )] fn batch<'a>(&self, batch: Batch<'a>) -> impl Future + Send + 'a { + info!(batch_size = batch.len(), "Batch rpc called."); self.service.batch(batch).in_current_span() } diff --git a/crates/rpc/rpc-server/src/starknet/list.rs b/crates/rpc/rpc-server/src/starknet/list.rs index af5f18083..caa95063a 100644 --- a/crates/rpc/rpc-server/src/starknet/list.rs +++ b/crates/rpc/rpc-server/src/starknet/list.rs @@ -9,6 +9,7 @@ use katana_rpc_api::starknet_ext::StarknetApiExtServer; use katana_rpc_types::list::{ GetBlocksRequest, GetBlocksResponse, GetTransactionsRequest, GetTransactionsResponse, }; +use katana_rpc_types::RpcTxWithHash; use super::StarknetApi; use crate::starknet::pending::PendingBlockProvider; @@ -20,6 +21,7 @@ where PP: PendingBlockProvider, PF: ProviderFactory, ::Provider: ProviderRO, + ::Transaction: Into, { async fn get_blocks(&self, request: GetBlocksRequest) -> RpcResult { Ok(self.blocks(request).await?) diff --git a/crates/rpc/rpc-server/src/starknet/mod.rs b/crates/rpc/rpc-server/src/starknet/mod.rs index 77f8f64c3..0cc0f990e 100644 --- a/crates/rpc/rpc-server/src/starknet/mod.rs +++ b/crates/rpc/rpc-server/src/starknet/mod.rs @@ -1,3 +1,6 @@ +#![allow(unused_imports)] +#![allow(clippy::too_many_arguments)] +#![allow(clippy::unnecessary_map_or)] //! Server implementation for the Starknet JSON-RPC API. use std::fmt::Debug; @@ -8,6 +11,7 @@ use katana_chain_spec::ChainSpec; use katana_core::backend::storage::ProviderRO; use katana_core::utils::get_current_timestamp; use katana_gas_price_oracle::GasPriceOracle; +use katana_optimistic::executor::OptimisticState; use katana_pool::TransactionPool; use katana_primitives::block::{BlockHashOrNumber, BlockIdOrTag, FinalityStatus, GasPrices}; use katana_primitives::class::{ClassHash, CompiledClass}; @@ -33,7 +37,9 @@ use katana_rpc_types::block::{ GetBlockWithTxHashesResponse, MaybePreConfirmedBlock, }; use katana_rpc_types::class::Class; -use katana_rpc_types::event::{EventFilterWithPage, GetEventsResponse, ResultPageRequest}; +use katana_rpc_types::event::{ + EmittedEvent, EventFilterWithPage, GetEventsResponse, ResultPageRequest, +}; use katana_rpc_types::list::{ ContinuationToken as ListContinuationToken, GetBlocksRequest, GetBlocksResponse, GetTransactionsRequest, GetTransactionsResponse, TransactionListItem, @@ -48,9 +54,10 @@ use katana_rpc_types::trie::{ use katana_rpc_types::{FeeEstimate, TxStatus}; use katana_rpc_types_builder::{BlockBuilder, ReceiptBuilder}; use katana_tasks::{Result as TaskResult, TaskSpawner}; +use tracing::trace; use crate::permit::Permits; -use crate::utils::events::{Cursor, EventBlockId}; +use crate::utils::events::{fetch_events_at_blocks, fetch_tx_events, Cursor, EventBlockId, Filter}; use crate::{utils, DEFAULT_ESTIMATE_FEE_MAX_CONCURRENT_REQUESTS}; mod blockifier; @@ -64,7 +71,7 @@ mod write; #[cfg(feature = "cartridge")] pub use config::PaymasterConfig; pub use config::StarknetApiConfig; -pub use pending::PendingBlockProvider; +pub use pending::{OptimisticPendingBlockProvider, PendingBlockProvider}; pub type StarknetApiResult = Result; @@ -98,6 +105,7 @@ where task_spawner: TaskSpawner, estimate_fee_permit: Permits, pending_block_provider: PP, + optimistic_state: Option, config: StarknetApiConfig, } @@ -114,6 +122,7 @@ where task_spawner: TaskSpawner, pending_block_provider: PP, gas_oracle: GasPriceOracle, + optimistic_state: Option, config: StarknetApiConfig, storage2: PF, ) -> Self { @@ -125,6 +134,7 @@ where pending_block_provider, gas_oracle, storage2, + optimistic_state, ) } @@ -137,6 +147,7 @@ where pending_block_provider: PP, gas_oracle: GasPriceOracle, storage2: PF, + optimistic_state: Option, ) -> Self { let total_permits = config .max_concurrent_estimate_fee_requests @@ -152,6 +163,7 @@ where pending_block_provider, gas_oracle, storage: storage2, + optimistic_state, }; Self { inner: Arc::new(inner) } @@ -539,35 +551,11 @@ where } } - async fn transaction(&self, hash: TxHash) -> StarknetApiResult { - let tx = self - .on_io_blocking_task(move |this| { - if let pending_tx @ Some(..) = - this.inner.pending_block_provider.get_pending_transaction(hash)? - { - Result::<_, StarknetApiError>::Ok(pending_tx) - } else { - let tx = this - .storage() - .provider() - .transaction_by_hash(hash)? - .map(RpcTxWithHash::from); - - Result::<_, StarknetApiError>::Ok(tx) - } - }) - .await??; - - if let Some(tx) = tx { - Ok(tx) - } else { - Err(StarknetApiError::TxnHashNotFound) - } - } - async fn receipt(&self, hash: Felt) -> StarknetApiResult { + println!("requesting receipt for tx {hash:#x}"); let receipt = self .on_io_blocking_task(move |this| { + // Check pending block provider if let pending_receipt @ Some(..) = this.inner.pending_block_provider.get_pending_receipt(hash)? { @@ -589,41 +577,43 @@ where async fn transaction_status(&self, hash: TxHash) -> StarknetApiResult { let status = self .on_io_blocking_task(move |this| { - let provider = this.storage().provider(); - let status = provider.transaction_status(hash)?; - - if let Some(status) = status { - // TODO: this might not work once we allow querying for 'failed' transactions - // from the provider - let Some(receipt) = provider.receipt_by_hash(hash)? else { - let error = StarknetApiError::unexpected( - "Transaction hash exist, but the receipt is missing", - ); - return Err(error); - }; - - let exec_status = if let Some(reason) = receipt.revert_reason() { - katana_rpc_types::ExecutionResult::Reverted { reason: reason.to_string() } - } else { - katana_rpc_types::ExecutionResult::Succeeded - }; - - let status = match status { - FinalityStatus::AcceptedOnL1 => TxStatus::AcceptedOnL1(exec_status), - FinalityStatus::AcceptedOnL2 => TxStatus::AcceptedOnL2(exec_status), - FinalityStatus::PreConfirmed => TxStatus::PreConfirmed(exec_status), - }; - - return Ok(Some(status)); - } - // seach in the pending block if the transaction is not found if let Some(receipt) = this.inner.pending_block_provider.get_pending_receipt(hash)? { Ok(Some(TxStatus::PreConfirmed(receipt.receipt.execution_result().clone()))) } else { - Ok(None) + let provider = this.storage().provider(); + let status = provider.transaction_status(hash)?; + + if let Some(status) = status { + // TODO: this might not work once we allow querying for 'failed' + // transactions from the provider + let Some(receipt) = provider.receipt_by_hash(hash)? else { + let error = StarknetApiError::unexpected( + "Transaction hash exist, but the receipt is missing", + ); + return Err(error); + }; + + let exec_status = if let Some(reason) = receipt.revert_reason() { + katana_rpc_types::ExecutionResult::Reverted { + reason: reason.to_string(), + } + } else { + katana_rpc_types::ExecutionResult::Succeeded + }; + + let status = match status { + FinalityStatus::AcceptedOnL1 => TxStatus::AcceptedOnL1(exec_status), + FinalityStatus::AcceptedOnL2 => TxStatus::AcceptedOnL2(exec_status), + FinalityStatus::PreConfirmed => TxStatus::PreConfirmed(exec_status), + }; + + Ok(Some(status)) + } else { + Ok(None) + } } }) .await??; @@ -683,18 +673,21 @@ where if let Some(block) = this.inner.pending_block_provider.get_pending_block_with_receipts()? { - return Ok(Some(GetBlockWithReceiptsResponse::PreConfirmed(block))); + Ok(Some(GetBlockWithReceiptsResponse::PreConfirmed(block))) + } else { + Ok(None) } - } - - if let Some(num) = provider.convert_block_id(block_id)? { - let block = katana_rpc_types_builder::BlockBuilder::new(num.into(), provider) - .build_with_receipts()? - .map(GetBlockWithReceiptsResponse::Block); - - StarknetApiResult::Ok(block) } else { + // if let Some(num) = provider.convert_block_id(block_id)? { + // let block = + // katana_rpc_types_builder::BlockBuilder::new(num.into(), provider) + // .build_with_receipts()? + // .map(GetBlockWithReceiptsResponse::Block); + + // StarknetApiResult::Ok(block) + // } else { StarknetApiResult::Ok(None) + // } } }) .await??; @@ -722,15 +715,17 @@ where } } - if let Some(num) = provider.convert_block_id(block_id)? { - let block = katana_rpc_types_builder::BlockBuilder::new(num.into(), provider) - .build_with_tx_hash()? - .map(GetBlockWithTxHashesResponse::Block); + // if let Some(num) = provider.convert_block_id(block_id)? { + // let block = katana_rpc_types_builder::BlockBuilder::new(num.into(), provider) + // .build_with_tx_hash()? + // .map(GetBlockWithTxHashesResponse::Block); - StarknetApiResult::Ok(block) - } else { - StarknetApiResult::Ok(None) - } + // StarknetApiResult::Ok(block) + // } else { + // StarknetApiResult::Ok(None) + // } + + StarknetApiResult::Ok(None) }) .await??; @@ -829,6 +824,139 @@ where .await? } + /// Extracts and filters events from the optimistic state transactions. + /// Returns a continuation token if there are more events to fetch. + fn fetch_optimistic_events( + &self, + address: Option, + keys: &Option>>, + events_buffer: &mut Vec, + chunk_size: u64, + continuation_token: Option<&katana_primitives::event::ContinuationToken>, + ) -> StarknetApiResult> { + if let Some(optimistic_state) = &self.inner.optimistic_state { + let transactions = optimistic_state.transactions.read(); + + // Determine starting position from continuation token + let (start_txn_idx, start_event_idx) = if let Some(token) = continuation_token { + // If transaction hash is present, use it to find the transaction + if let Some(tx_hash) = &token.transaction_hash { + // Find the transaction by hash + if let Some(idx) = transactions.iter().position(|(tx, _)| &tx.hash == tx_hash) { + (idx, token.event_n as usize) + } else { + // Transaction not found (likely removed by poll_confirmed_blocks) + // Start from the beginning + trace!( + target: "rpc::starknet", + tx_hash = format!("{:#x}", tx_hash), + "Transaction from continuation token not found in optimistic state, starting from beginning" + ); + (0, 0) + } + } else { + // // Use txn_n index if no hash is provided (backward compatibility) + // (token.txn_n as usize, token.event_n as usize) + unimplemented!() + } + } else { + (0, 0) + }; + + dbg!(transactions.len()); + for (tx_idx, (tx, result)) in transactions.iter().enumerate() { + // Skip transactions before the continuation token + if tx_idx < start_txn_idx { + continue; + } + + // Stop if we've reached the chunk size limit + if events_buffer.len() >= chunk_size as usize { + break; + } + // Only process successful executions + if let katana_executor::ExecutionResult::Success { receipt, .. } = result { + for (event_idx, event) in receipt.events().iter().enumerate() { + // Skip events before the continuation token in the current transaction + if tx_idx == start_txn_idx && event_idx < start_event_idx { + continue; + } + // Apply address filter + if let Some(filter_address) = address { + if event.from_address != filter_address { + continue; + } + } + + // Apply keys filter + if let Some(filter_keys) = keys { + let mut matches = true; + for (i, key_set) in filter_keys.iter().enumerate() { + if !key_set.is_empty() { + if let Some(event_key) = event.keys.get(i) { + if !key_set.contains(event_key) { + matches = false; + break; + } + } else { + matches = false; + break; + } + } + } + + if !matches { + continue; + } + } + + // Event matches the filter, add it to the buffer + events_buffer.push(EmittedEvent { + from_address: event.from_address, + keys: event.keys.clone(), + data: event.data.clone(), + block_hash: None, // Optimistic transactions don't have a block hash yet + block_number: None, /* Optimistic transactions don't have a block + * number yet */ + transaction_hash: tx.hash, + transaction_index: None, // Optimistic transactions don't have a tx index yet + event_index: Some(event_idx as u64), + }); + + // Stop if we've reached the chunk size limit + if events_buffer.len() >= chunk_size as usize { + // Return a continuation token with the current position + let next_event_idx = event_idx + 1; + let token = katana_primitives::event::ContinuationToken { + block_n: 0, // Not used for optimistic transactions + txn_n: tx_idx as u64, + event_n: next_event_idx as u64, + transaction_hash: Some(tx.hash), + }; + return Ok(Some(token)); + } + } + } + } + + // if we already exhaust all the optimistic transactions then we return a continuation + // token pointing to the next optimistic transaction + return Ok(Some(katana_primitives::event::ContinuationToken { + block_n: 0, // Not used for optimistic transactions + txn_n: transactions.len() as u64, + event_n: transactions + .last() + .and_then(|(.., result)| { + result.receipt().map(|receipt| receipt.events().len() as u64) + }) + .unwrap_or(0), + transaction_hash: transactions.last().map(|(tx, ..)| tx.hash), + })); + } + + Ok(None) + } + // TODO: should document more and possible find a simpler solution(?) fn events_inner( &self, @@ -846,95 +974,54 @@ where // reserved buffer to fill up with events to avoid reallocations let mut events = Vec::with_capacity(chunk_size as usize); - let filter = utils::events::Filter { address, keys: keys.clone() }; match (from, to) { - (EventBlockId::Num(from), EventBlockId::Num(to)) => { - let from_after_forked_if_any = from; - - let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); - let block_range = from_after_forked_if_any..=to; - - let cursor = utils::events::fetch_events_at_blocks( - provider, - block_range, - &filter, - chunk_size, - cursor, - &mut events, - )?; - - let continuation_token = cursor.map(|c| c.into_rpc_cursor().to_string()); - let events_page = GetEventsResponse { events, continuation_token }; - - Ok(events_page) + (EventBlockId::Num(_from), EventBlockId::Num(_to)) => { + // TODO: implement fetching events from storage for non-pending block ranges + Ok(GetEventsResponse { events, continuation_token: None }) } - (EventBlockId::Num(from), EventBlockId::Pending) => { - let from_after_forked_if_any = from; - - let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); - let latest = provider.latest_number()?; - let block_range = from_after_forked_if_any..=latest; - - let int_cursor = utils::events::fetch_events_at_blocks( - provider, - block_range, - &filter, - chunk_size, - cursor.clone(), + (EventBlockId::Num(_from), EventBlockId::Pending) => { + // Fetch events from optimistic state transactions (which serve as "pending" + // transactions) + // Extract native token if present + let native_token = continuation_token.as_ref().and_then(|t| match t { + MaybeForkedContinuationToken::Token(token) => Some(token), + _ => None, + }); + + let opt_token = self.fetch_optimistic_events( + address, + &keys, &mut events, + chunk_size, + native_token, )?; - // if the internal cursor is Some, meaning the buffer is full and we havent - // reached the latest block. - if let Some(c) = int_cursor { - let continuation_token = Some(c.into_rpc_cursor().to_string()); - return Ok(GetEventsResponse { events, continuation_token }); - } - - if let Some(block) = - self.inner.pending_block_provider.get_pending_block_with_receipts()? - { - let cursor = utils::events::fetch_pending_events( - &block, - &filter, - chunk_size, - cursor, - &mut events, - )?; - - let continuation_token = Some(cursor.into_rpc_cursor().to_string()); - Ok(GetEventsResponse { events, continuation_token }) - } else { - let cursor = Cursor::new_block(latest + 1); - let continuation_token = Some(cursor.into_rpc_cursor().to_string()); - Ok(GetEventsResponse { events, continuation_token }) - } + let continuation_token = + opt_token.map(|t| MaybeForkedContinuationToken::Token(t).to_string()); + Ok(GetEventsResponse { events, continuation_token }) } (EventBlockId::Pending, EventBlockId::Pending) => { - if let Some(block) = - self.inner.pending_block_provider.get_pending_block_with_receipts()? - { - let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); - let new_cursor = utils::events::fetch_pending_events( - &block, - &filter, - chunk_size, - cursor, - &mut events, - )?; - - let continuation_token = Some(new_cursor.into_rpc_cursor().to_string()); - Ok(GetEventsResponse { events, continuation_token }) - } else { - let latest = provider.latest_number()?; - let new_cursor = Cursor::new_block(latest); + // Fetch events from optimistic state transactions (which represent pending + // transactions) + // Extract native token if present + let native_token = continuation_token.as_ref().and_then(|t| match t { + MaybeForkedContinuationToken::Token(token) => Some(token), + _ => None, + }); + let opt_token = self.fetch_optimistic_events( + address, + &keys, + &mut events, + chunk_size, + native_token, + )?; - let continuation_token = Some(new_cursor.into_rpc_cursor().to_string()); - Ok(GetEventsResponse { events, continuation_token }) - } + let continuation_token = + opt_token.map(|t| MaybeForkedContinuationToken::Token(t).to_string()); + Ok(GetEventsResponse { events, continuation_token }) } (EventBlockId::Pending, EventBlockId::Num(_)) => Err(StarknetApiError::unexpected( @@ -958,8 +1045,8 @@ where BlockIdOrTag::Number(num) => EventBlockId::Num(num), BlockIdOrTag::Latest => { - let num = provider.convert_block_id(id)?; - EventBlockId::Num(num.ok_or(StarknetApiError::BlockNotFound)?) + let num = provider.latest_number()?; + EventBlockId::Num(num) } BlockIdOrTag::Hash(..) => { @@ -1234,6 +1321,55 @@ where } } +// Separate impl block for methods that require the pool transaction to be convertible to RpcTxWithHash +impl StarknetApi +where + Pool: TransactionPool + 'static, + ::Transaction: Into, + PP: PendingBlockProvider, + PF: ProviderFactory, + ::Provider: ProviderRO, +{ + async fn transaction(&self, hash: TxHash) -> StarknetApiResult { + let tx = self + .on_io_blocking_task(move |this| { + // First, check optimistic state for the transaction + if let Some(optimistic_state) = &this.inner.optimistic_state { + let transactions = optimistic_state.transactions.read(); + if let Some((tx, _result)) = transactions.iter().find(|(tx, _)| tx.hash == hash) + { + return Result::<_, StarknetApiError>::Ok(Some(RpcTxWithHash::from( + tx.clone(), + ))); + } + } + + // Check pending block provider + if let pending_tx @ Some(..) = + this.inner.pending_block_provider.get_pending_transaction(hash)? + { + Result::<_, StarknetApiError>::Ok(pending_tx) + } else { + let tx = this + .storage() + .provider() + .transaction_by_hash(hash)? + .map(RpcTxWithHash::from); + + Result::<_, StarknetApiError>::Ok(tx) + } + }) + .await??; + + if let Some(tx) = tx { + Ok(tx) + } else { + let pool_tx = self.inner.pool.get(hash).ok_or(StarknetApiError::TxnHashNotFound)?; + Ok(Into::into(pool_tx.as_ref().clone())) + } + } +} + impl Clone for StarknetApi where Pool: TransactionPool, diff --git a/crates/rpc/rpc-server/src/starknet/pending.rs b/crates/rpc/rpc-server/src/starknet/pending.rs index e3c0358fa..a8999832a 100644 --- a/crates/rpc/rpc-server/src/starknet/pending.rs +++ b/crates/rpc/rpc-server/src/starknet/pending.rs @@ -1,19 +1,24 @@ +#![allow(clippy::collapsible_match)] + use std::fmt::Debug; use katana_core::backend::storage::ProviderRO; use katana_core::service::block_producer::{BlockProducer, BlockProducerMode}; use katana_executor::ExecutorFactory; -use katana_primitives::block::PartialHeader; +use katana_primitives::block::{BlockIdOrTag, FinalityStatus, PartialHeader}; use katana_primitives::da::L1DataAvailabilityMode; use katana_primitives::execution::TypedTransactionExecutionInfo; use katana_primitives::transaction::{TxHash, TxNumber}; use katana_primitives::version::CURRENT_STARKNET_VERSION; -use katana_provider::api::state::StateProvider; +use katana_provider::api::block::BlockNumberProvider; +use katana_provider::api::state::{StateFactoryProvider, StateProvider}; +use katana_provider::providers::db::cached::CachedStateProvider; use katana_provider::ProviderFactory; +use katana_rpc_client::starknet::Client; use katana_rpc_types::{ - FinalityStatus, PreConfirmedBlockWithReceipts, PreConfirmedBlockWithTxHashes, - PreConfirmedBlockWithTxs, PreConfirmedStateUpdate, ReceiptBlockInfo, RpcTxWithHash, - TxReceiptWithBlockInfo, TxTrace, + PreConfirmedBlockWithReceipts, PreConfirmedBlockWithTxHashes, PreConfirmedBlockWithTxs, + PreConfirmedStateUpdate, ReceiptBlockInfo, RpcTx, RpcTxReceiptWithHash, RpcTxWithHash, + RpcTxWithReceipt, TxReceiptWithBlockInfo, TxTrace, }; use crate::starknet::StarknetApiResult; @@ -279,3 +284,293 @@ where } } } + +/// A pending block provider that checks the optimistic state for transactions/receipts, +/// then falls back to the client for all queries. +#[derive(Debug, Clone)] +pub struct OptimisticPendingBlockProvider

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + optimistic_state: katana_optimistic::executor::OptimisticState, + client: Client, + storage: P, +} + +impl

OptimisticPendingBlockProvider

+where + P: ProviderFactory, + P::Provider: ProviderRO, +{ + pub fn new( + optimistic_state: katana_optimistic::executor::OptimisticState, + client: Client, + storage: P, + ) -> Self { + Self { optimistic_state, client, storage } + } +} + +impl

PendingBlockProvider for OptimisticPendingBlockProvider

+where + P: ProviderFactory + Clone, + P::Provider: ProviderRO, +{ + fn pending_state(&self) -> StarknetApiResult>> { + let latest_state = self.storage.provider().latest()?; + Ok(Some(self.optimistic_state.get_optimistic_state(latest_state))) + } + + fn get_pending_state_update(&self) -> StarknetApiResult> { + self.client.get_pending_state_update() + } + + fn get_pending_block_with_txs(&self) -> StarknetApiResult> { + if let Some(block) = self.client.get_pending_block_with_txs()? { + let optimistic_transactions = self + .optimistic_state + .transactions + .read() + .iter() + .map(|(tx, ..)| tx.clone()) + .map(RpcTxWithHash::from) + .collect::>(); + + Ok(Some(PreConfirmedBlockWithTxs { transactions: optimistic_transactions, ..block })) + } else { + Ok(None) + } + } + + fn get_pending_block_with_receipts( + &self, + ) -> StarknetApiResult> { + if let Some(block) = self.client.get_pending_block_with_receipts()? { + let optimistic_transactions = self + .optimistic_state + .transactions + .read() + .iter() + .filter_map(|(tx, result)| { + if let Some(receipt) = result.receipt() { + let transaction = RpcTx::from(tx.transaction.clone()); + let receipt = RpcTxReceiptWithHash::new( + tx.hash, + receipt.clone(), + FinalityStatus::PreConfirmed, + ); + + Some(RpcTxWithReceipt { transaction, receipt }) + } else { + None + } + }) + .collect::>(); + + Ok(Some(PreConfirmedBlockWithReceipts { + transactions: optimistic_transactions, + ..block + })) + } else { + Ok(None) + } + } + + fn get_pending_block_with_tx_hashes( + &self, + ) -> StarknetApiResult> { + self.client.get_pending_block_with_tx_hashes() + } + + fn get_pending_transaction(&self, hash: TxHash) -> StarknetApiResult> { + // First, check optimistic state + let transactions = self.optimistic_state.transactions.read(); + if let Some((tx, _result)) = transactions.iter().find(|(tx, _)| tx.hash == hash) { + return Ok(Some(RpcTxWithHash::from(tx.clone()))); + } + + // Fall back to client + self.client.get_pending_transaction(hash) + } + + fn get_pending_receipt( + &self, + hash: TxHash, + ) -> StarknetApiResult> { + // First, check optimistic state + let transactions = self.optimistic_state.transactions.read(); + if let Some((_tx, result)) = transactions.iter().find(|(tx, _)| tx.hash == hash) { + if let katana_executor::ExecutionResult::Success { receipt, .. } = result { + println!("receipt found in optimsitic state. hash: {hash:#x}"); + + let block = ReceiptBlockInfo::PreConfirmed { block_number: 0 }; + + // Create receipt with block info + let receipt_with_block = TxReceiptWithBlockInfo::new( + block, + hash, + FinalityStatus::PreConfirmed, + receipt.clone(), + ); + + return Ok(Some(receipt_with_block)); + } + } else { + println!("receipt not found in optimsitic state. hash: {hash:#x}"); + } + + // Fall back to client + println!("falling back to forked client to find receipt hash: {hash:#x}"); + self.client.get_pending_receipt(hash) + } + + fn get_pending_trace(&self, hash: TxHash) -> StarknetApiResult> { + // First, check optimistic state + let transactions = self.optimistic_state.transactions.read(); + if let Some((tx, result)) = transactions.iter().find(|(tx, _)| tx.hash == hash) { + if let katana_executor::ExecutionResult::Success { trace, .. } = result { + let typed_trace = TypedTransactionExecutionInfo::new(tx.r#type(), trace.clone()); + return Ok(Some(TxTrace::from(typed_trace))); + } + } + + // Fall back to client + self.client.get_pending_trace(hash) + } + + fn get_pending_transaction_by_index( + &self, + index: TxNumber, + ) -> StarknetApiResult> { + // Check optimistic state by index + let transactions = self.optimistic_state.transactions.read(); + if let Some((tx, _result)) = transactions.get(index as usize) { + return Ok(Some(RpcTxWithHash::from(tx.clone()))); + } + + // Fall back to client + self.client.get_pending_transaction_by_index(index) + } +} + +impl PendingBlockProvider for katana_rpc_client::starknet::Client { + fn get_pending_state_update(&self) -> StarknetApiResult> { + let result = futures::executor::block_on(async { + self.get_state_update(BlockIdOrTag::PreConfirmed).await + }); + + match result { + Ok(state_update) => match state_update { + katana_rpc_types::state_update::StateUpdate::PreConfirmed(update) => { + Ok(Some(update)) + } + _ => Ok(None), + }, + Err(_) => Ok(None), + } + } + + fn get_pending_block_with_txs(&self) -> StarknetApiResult> { + let result = futures::executor::block_on(async { + self.get_block_with_txs(BlockIdOrTag::PreConfirmed).await + }); + + match result { + Ok(block) => match block { + katana_rpc_types::block::MaybePreConfirmedBlock::PreConfirmed(block) => { + Ok(Some(block)) + } + _ => Ok(None), + }, + Err(_) => Ok(None), + } + } + + fn get_pending_block_with_receipts( + &self, + ) -> StarknetApiResult> { + let result = futures::executor::block_on(async { + self.get_block_with_receipts(BlockIdOrTag::PreConfirmed).await + }); + + match result { + Ok(block) => match block { + katana_rpc_types::block::GetBlockWithReceiptsResponse::PreConfirmed(block) => { + Ok(Some(block)) + } + _ => Ok(None), + }, + Err(_) => Ok(None), + } + } + + fn get_pending_block_with_tx_hashes( + &self, + ) -> StarknetApiResult> { + let result = futures::executor::block_on(async { + self.get_block_with_tx_hashes(BlockIdOrTag::PreConfirmed).await + }); + + match result { + Ok(block) => match block { + katana_rpc_types::block::GetBlockWithTxHashesResponse::PreConfirmed(block) => { + Ok(Some(block)) + } + _ => Ok(None), + }, + Err(_) => Ok(None), + } + } + + fn get_pending_transaction(&self, hash: TxHash) -> StarknetApiResult> { + let result = + futures::executor::block_on(async { self.get_transaction_by_hash(hash).await }); + + match result { + Ok(tx) => Ok(Some(tx)), + Err(_) => Ok(None), + } + } + + fn get_pending_receipt( + &self, + hash: TxHash, + ) -> StarknetApiResult> { + let result = + futures::executor::block_on(async { self.get_transaction_receipt(hash).await }); + + match result { + Ok(receipt) => Ok(Some(receipt)), + Err(_) => Ok(None), + } + } + + fn get_pending_trace(&self, hash: TxHash) -> StarknetApiResult> { + let result = futures::executor::block_on(async { self.trace_transaction(hash).await }); + + match result { + Ok(trace) => Ok(Some(trace)), + Err(_) => Ok(None), + } + } + + fn get_pending_transaction_by_index( + &self, + index: TxNumber, + ) -> StarknetApiResult> { + let result = futures::executor::block_on(async { + self.get_transaction_by_block_id_and_index(BlockIdOrTag::PreConfirmed, index).await + }); + + match result { + Ok(tx) => Ok(Some(tx)), + Err(_) => Ok(None), + } + } + + fn pending_state(&self) -> StarknetApiResult>> { + // Client-based pending block provider doesn't provide state access + Ok(None) + } +} diff --git a/crates/rpc/rpc-server/src/starknet/read.rs b/crates/rpc/rpc-server/src/starknet/read.rs index a01e89725..4dcfcc466 100644 --- a/crates/rpc/rpc-server/src/starknet/read.rs +++ b/crates/rpc/rpc-server/src/starknet/read.rs @@ -44,7 +44,7 @@ use crate::starknet::pending::PendingBlockProvider; impl StarknetApiServer for StarknetApi where Pool: TransactionPool + Send + Sync + 'static, - PoolTx: From, + ::Transaction: Into, Pending: PendingBlockProvider, PF: ProviderFactory, ::Provider: ProviderRO, @@ -82,7 +82,7 @@ where } async fn block_hash_and_number(&self) -> RpcResult { - self.on_io_blocking_task(move |this| Ok(this.block_hash_and_number()?)).await? + Ok(self.block_hash_and_number()?) } async fn get_block_with_tx_hashes( @@ -143,7 +143,31 @@ where } async fn get_events(&self, filter: EventFilterWithPage) -> RpcResult { - Ok(self.events(filter).await?) + use std::collections::HashMap; + use std::sync::{LazyLock, Mutex}; + + // Function-local static cache for events + static EVENTS_CACHE: LazyLock>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + + // Check cache first + { + let cache = EVENTS_CACHE.lock().unwrap(); + if let Some(cached_result) = cache.get(&filter) { + return Ok(cached_result.clone()); + } + } + + // If not in cache, fetch the events + let result = self.events(filter.clone()).await?; + + // Store in cache + { + let mut cache = EVENTS_CACHE.lock().unwrap(); + cache.insert(filter, result.clone()); + } + + Ok(result) } async fn call(&self, request: FunctionCall, block_id: BlockIdOrTag) -> RpcResult { diff --git a/crates/rpc/rpc-server/src/starknet/trace.rs b/crates/rpc/rpc-server/src/starknet/trace.rs index dba96b34c..a09d3a2d8 100644 --- a/crates/rpc/rpc-server/src/starknet/trace.rs +++ b/crates/rpc/rpc-server/src/starknet/trace.rs @@ -15,15 +15,15 @@ use katana_rpc_types::trace::{ to_rpc_fee_estimate, SimulatedTransactions, SimulatedTransactionsResponse, TraceBlockTransactionsResponse, TxTrace, TxTraceWithHash, }; -use katana_rpc_types::{BroadcastedTxWithChainId, SimulationFlag}; +use katana_rpc_types::{BroadcastedTxWithChainId, RpcTxWithHash, SimulationFlag}; use super::StarknetApi; use crate::starknet::pending::PendingBlockProvider; -impl StarknetApi +impl StarknetApi where - Pool: TransactionPool + Send + Sync + 'static, - PoolTx: From, + Pool: TransactionPool + Send + Sync + 'static, + ::Transaction: Into, Pending: PendingBlockProvider, PF: ProviderFactory, ::Provider: ProviderRO, @@ -144,10 +144,10 @@ where } #[async_trait] -impl StarknetTraceApiServer for StarknetApi +impl StarknetTraceApiServer for StarknetApi where - Pool: TransactionPool + Send + Sync + 'static, - PoolTx: From, + Pool: TransactionPool + Send + Sync + 'static, + ::Transaction: Into, Pending: PendingBlockProvider, PF: ProviderFactory, ::Provider: ProviderRO, diff --git a/crates/rpc/rpc-server/src/starknet/write.rs b/crates/rpc/rpc-server/src/starknet/write.rs index 79bc64a98..55388b4e8 100644 --- a/crates/rpc/rpc-server/src/starknet/write.rs +++ b/crates/rpc/rpc-server/src/starknet/write.rs @@ -8,15 +8,16 @@ use katana_rpc_types::broadcasted::{ AddInvokeTransactionResponse, BroadcastedDeclareTx, BroadcastedDeployAccountTx, BroadcastedInvokeTx, }; -use katana_rpc_types::{BroadcastedTx, BroadcastedTxWithChainId}; +use katana_rpc_types::{BroadcastedTx, BroadcastedTxWithChainId, RpcTxWithHash}; use super::StarknetApi; use crate::starknet::pending::PendingBlockProvider; -impl StarknetApi +impl StarknetApi where - Pool: TransactionPool + Send + Sync + 'static, - PoolTx: From, + Pool: TransactionPool + Send + Sync + 'static, + ::Transaction: From, + ::Transaction: Into, Pending: PendingBlockProvider, PF: ProviderFactory, { @@ -80,10 +81,11 @@ where } #[async_trait] -impl StarknetWriteApiServer for StarknetApi +impl StarknetWriteApiServer for StarknetApi where - Pool: TransactionPool + Send + Sync + 'static, - PoolTx: From, + Pool: TransactionPool + Send + Sync + 'static, + ::Transaction: From, + RpcTxWithHash: From<::Transaction>, Pending: PendingBlockProvider, PF: ProviderFactory, { diff --git a/crates/rpc/rpc-server/src/utils/events.rs b/crates/rpc/rpc-server/src/utils/events.rs index 061e344e4..243ff053b 100644 --- a/crates/rpc/rpc-server/src/utils/events.rs +++ b/crates/rpc/rpc-server/src/utils/events.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::cmp::Ordering; use std::ops::RangeInclusive; @@ -66,21 +68,22 @@ impl Cursor { block_n: self.block, txn_n: self.txn.idx as u64, event_n: self.txn.event as u64, + transaction_hash: None, } } } /// A partial cursor that points to a specific event WITHIN a transaction. #[derive(Debug, Clone, PartialEq, Default)] -struct PartialCursor { +pub struct PartialCursor { /// The transaction index within a block. - idx: usize, + pub idx: usize, /// The event index within a transaction. - event: usize, + pub event: usize, } impl PartialCursor { - fn into_full(self, block: BlockNumber) -> Cursor { + pub fn into_full(self, block: BlockNumber) -> Cursor { Cursor { block, txn: self } } } @@ -297,7 +300,7 @@ impl<'a, I: Iterator> Iterator for FilteredEvents<'a, /// * `chunk_size` - Maximum number of events that can be taken, based on user-specified chunk size /// * `buffer` - Buffer to store the matched events #[allow(clippy::too_many_arguments)] -fn fetch_tx_events( +pub fn fetch_tx_events( next_event_idx: usize, block_number: Option, block_hash: Option, diff --git a/crates/rpc/rpc-types/src/broadcasted.rs b/crates/rpc/rpc-types/src/broadcasted.rs index 1ee963df5..05c18e529 100644 --- a/crates/rpc/rpc-types/src/broadcasted.rs +++ b/crates/rpc/rpc-types/src/broadcasted.rs @@ -730,6 +730,63 @@ impl From for ExecutableTxWithHash { } } +impl From for crate::transaction::RpcTxWithHash { + fn from(value: BroadcastedTxWithChainId) -> Self { + use crate::transaction::{ + RpcDeclareTx, RpcDeclareTxV3, RpcDeployAccountTx, RpcDeployAccountTxV3, RpcInvokeTx, + RpcInvokeTxV3, RpcTx, + }; + + let transaction_hash = value.calculate_hash(); + let transaction = match value.tx { + BroadcastedTx::Invoke(tx) => RpcTx::Invoke(RpcInvokeTx::V3(RpcInvokeTxV3 { + sender_address: tx.sender_address, + calldata: tx.calldata, + signature: tx.signature, + nonce: tx.nonce, + resource_bounds: tx.resource_bounds, + tip: tx.tip, + paymaster_data: tx.paymaster_data, + account_deployment_data: tx.account_deployment_data, + nonce_data_availability_mode: tx.nonce_data_availability_mode, + fee_data_availability_mode: tx.fee_data_availability_mode, + })), + BroadcastedTx::Declare(tx) => { + let class_hash = tx.contract_class.hash(); + RpcTx::Declare(RpcDeclareTx::V3(RpcDeclareTxV3 { + sender_address: tx.sender_address, + compiled_class_hash: tx.compiled_class_hash, + signature: tx.signature, + nonce: tx.nonce, + class_hash, + resource_bounds: tx.resource_bounds, + tip: tx.tip, + paymaster_data: tx.paymaster_data, + account_deployment_data: tx.account_deployment_data, + nonce_data_availability_mode: tx.nonce_data_availability_mode, + fee_data_availability_mode: tx.fee_data_availability_mode, + })) + } + BroadcastedTx::DeployAccount(tx) => { + RpcTx::DeployAccount(RpcDeployAccountTx::V3(RpcDeployAccountTxV3 { + signature: tx.signature, + nonce: tx.nonce, + contract_address_salt: tx.contract_address_salt, + constructor_calldata: tx.constructor_calldata, + class_hash: tx.class_hash, + paymaster_data: tx.paymaster_data, + tip: tx.tip, + resource_bounds: tx.resource_bounds, + nonce_data_availability_mode: tx.nonce_data_availability_mode, + fee_data_availability_mode: tx.fee_data_availability_mode, + })) + } + }; + + crate::transaction::RpcTxWithHash { transaction_hash, transaction } + } +} + #[cfg(test)] mod tests { use assert_matches::assert_matches; diff --git a/crates/rpc/rpc-types/src/event.rs b/crates/rpc/rpc-types/src/event.rs index 9abc8ab21..6ff140794 100644 --- a/crates/rpc/rpc-types/src/event.rs +++ b/crates/rpc/rpc-types/src/event.rs @@ -4,7 +4,7 @@ use katana_primitives::{ContractAddress, Felt}; use serde::{Deserialize, Serialize}; /// Events request. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct EventFilterWithPage { #[serde(flatten)] pub event_filter: EventFilter, @@ -15,7 +15,7 @@ pub struct EventFilterWithPage { /// Event filter. /// /// An event filter/query. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct EventFilter { /// From block #[serde(skip_serializing_if = "Option::is_none")] @@ -35,7 +35,7 @@ pub struct EventFilter { } /// Result page request. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ResultPageRequest { /// The token returned from the previous query. If no token is provided the first page is /// returned. diff --git a/crates/rpc/rpc-types/src/transaction.rs b/crates/rpc/rpc-types/src/transaction.rs index dfb9d5e1b..5f46f965e 100644 --- a/crates/rpc/rpc-types/src/transaction.rs +++ b/crates/rpc/rpc-types/src/transaction.rs @@ -589,3 +589,9 @@ impl From for primitives::Tx { } } } + +impl From for RpcTxWithHash { + fn from(_tx: primitives::ExecutableTxWithHash) -> Self { + todo!() + } +} diff --git a/crates/storage/provider/provider/src/providers/db/cached.rs b/crates/storage/provider/provider/src/providers/db/cached.rs new file mode 100644 index 000000000..7c034da0d --- /dev/null +++ b/crates/storage/provider/provider/src/providers/db/cached.rs @@ -0,0 +1,181 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use katana_primitives::class::{ClassHash, CompiledClassHash, ContractClass}; +use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageValue}; +use katana_primitives::state::StateUpdatesWithClasses; +use katana_provider_api::contract::ContractClassProvider; +use katana_provider_api::state::{StateProofProvider, StateProvider, StateRootProvider}; +use parking_lot::RwLock; + +use crate::ProviderResult; + +/// Inner cache data protected by a single lock for consistent snapshots. +#[derive(Debug, Default)] +struct StateCacheInner { + /// Cache for contract nonces: ContractAddress -> Nonce + nonces: HashMap, + /// Cache for storage values: (ContractAddress, StorageKey) -> StorageValue + storage: HashMap<(ContractAddress, StorageKey), StorageValue>, + /// Cache for contract class hashes: ContractAddress -> ClassHash + class_hashes: HashMap, + /// Cache for contract classes: ClassHash -> ContractClass + classes: HashMap, + /// Cache for compiled class hashes: ClassHash -> CompiledClassHash + compiled_class_hashes: HashMap, +} + +/// A cache for storing state data in memory. +/// +/// Uses a single read-write lock to ensure consistent snapshots across all cached data. +/// This prevents reading inconsistent state that could occur with multiple independent locks. +#[derive(Debug, Clone)] +pub struct SharedStateCache { + inner: Arc>, +} + +impl Default for SharedStateCache { + fn default() -> Self { + Self::new() + } +} + +impl SharedStateCache { + fn new() -> Self { + Self { inner: Arc::new(RwLock::new(StateCacheInner::default())) } + } + + fn get_nonce(&self, address: ContractAddress) -> Option { + self.inner.read().nonces.get(&address).copied() + } + + fn get_storage(&self, address: ContractAddress, key: StorageKey) -> Option { + self.inner.read().storage.get(&(address, key)).copied() + } + + fn get_class_hash(&self, address: ContractAddress) -> Option { + self.inner.read().class_hashes.get(&address).copied() + } + + fn get_class(&self, hash: ClassHash) -> Option { + self.inner.read().classes.get(&hash).cloned() + } + + fn get_compiled_class_hash(&self, hash: ClassHash) -> Option { + self.inner.read().compiled_class_hashes.get(&hash).copied() + } + + /// Clears all cached data. + pub fn clear(&self) { + let mut cache = self.inner.write(); + cache.nonces.clear(); + cache.storage.clear(); + cache.class_hashes.clear(); + cache.classes.clear(); + cache.compiled_class_hashes.clear(); + } + + /// Merges state updates into the cache. + pub fn merge_state_updates(&self, updates: &StateUpdatesWithClasses) { + let mut cache = self.inner.write(); + let state = &updates.state_updates; + + for (address, nonce) in &state.nonce_updates { + cache.nonces.insert(*address, *nonce); + } + + for (address, storage) in &state.storage_updates { + for (key, value) in storage { + cache.storage.insert((*address, *key), *value); + } + } + + for (address, class_hash) in &state.deployed_contracts { + cache.class_hashes.insert(*address, *class_hash); + } + + for (address, class_hash) in &state.replaced_classes { + cache.class_hashes.insert(*address, *class_hash); + } + + for (class_hash, compiled_hash) in &state.declared_classes { + cache.compiled_class_hashes.insert(*class_hash, *compiled_hash); + } + + for (class_hash, class) in &updates.classes { + cache.classes.insert(*class_hash, class.clone()); + } + } +} + +/// A cached version of fork [`LatestStateProvider`] that checks the cache before querying the +/// database. +#[derive(Debug)] +pub struct CachedStateProvider { + state: S, + cache: SharedStateCache, +} + +impl CachedStateProvider { + pub fn new(state: S, cache: SharedStateCache) -> Self { + Self { state, cache } + } +} + +impl ContractClassProvider for CachedStateProvider { + fn class(&self, hash: ClassHash) -> ProviderResult> { + if let Some(class) = self.cache.get_class(hash) { + Ok(Some(class)) + } else { + Ok(self.state.class(hash)?) + } + } + + fn compiled_class_hash_of_class_hash( + &self, + hash: ClassHash, + ) -> ProviderResult> { + if let Some(compiled_hash) = self.cache.get_compiled_class_hash(hash) { + return Ok(Some(compiled_hash)); + } + + let compiled_hash = self.state.compiled_class_hash_of_class_hash(hash)?; + Ok(compiled_hash) + } +} + +impl StateProvider for CachedStateProvider { + fn nonce(&self, address: ContractAddress) -> ProviderResult> { + if let Some(nonce) = self.cache.get_nonce(address) { + Ok(Some(nonce)) + } else { + Ok(self.state.nonce(address)?) + } + } + + fn storage( + &self, + address: ContractAddress, + storage_key: StorageKey, + ) -> ProviderResult> { + if let Some(value) = self.cache.get_storage(address, storage_key) { + Ok(Some(value)) + } else { + Ok(self.state.storage(address, storage_key)?) + } + } + + fn class_hash_of_contract( + &self, + address: ContractAddress, + ) -> ProviderResult> { + if let Some(class_hash) = self.cache.get_class_hash(address) { + Ok(Some(class_hash)) + } else { + Ok(self.state.class_hash_of_contract(address)?) + } + } +} + +impl StateProofProvider for CachedStateProvider {} +impl StateRootProvider for CachedStateProvider {} diff --git a/crates/storage/provider/provider/src/providers/db/mod.rs b/crates/storage/provider/provider/src/providers/db/mod.rs index e821df0bf..0b38ece9b 100644 --- a/crates/storage/provider/provider/src/providers/db/mod.rs +++ b/crates/storage/provider/provider/src/providers/db/mod.rs @@ -1,3 +1,4 @@ +pub mod cached; pub mod state; pub mod trie; diff --git a/crates/storage/provider/provider/src/providers/fork/mod.rs b/crates/storage/provider/provider/src/providers/fork/mod.rs index c0d823ed1..bf225a287 100644 --- a/crates/storage/provider/provider/src/providers/fork/mod.rs +++ b/crates/storage/provider/provider/src/providers/fork/mod.rs @@ -33,10 +33,10 @@ use tracing::trace; use super::db::{self, DbProvider}; use crate::{DbProviderFactory, MutableProvider, ProviderFactory, ProviderResult}; -mod state; -mod trie; +pub mod state; +pub mod trie; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ForkedProvider { local_db: DbProvider, fork_db: ForkedDb, @@ -45,7 +45,7 @@ pub struct ForkedProvider { #[derive(Debug, Clone)] pub struct ForkedDb { backend: Backend, - block_id: BlockNumber, + pub(crate) block_id: BlockNumber, db: DbProviderFactory, } diff --git a/crates/tracing/src/lib.rs b/crates/tracing/src/lib.rs index bbe241fa5..23e4d3959 100644 --- a/crates/tracing/src/lib.rs +++ b/crates/tracing/src/lib.rs @@ -94,11 +94,11 @@ pub async fn init( logging: LoggingConfig, telemetry_config: Option, ) -> Result<(), Error> { - const DEFAULT_LOG_FILTER: &str = "katana_db::mdbx=trace,cairo_native::compiler=off,\ - pipeline=debug,stage=debug,tasks=debug,executor=trace,\ - forking::backend=trace,blockifier=off,jsonrpsee_server=off,\ - hyper=off,messaging=debug,node=error,explorer=info,\ - rpc=trace,pool=trace,katana_stage::downloader=trace,info"; + const DEFAULT_LOG_FILTER: &str = + "katana_db::mdbx=trace,cairo_native::compiler=off,pipeline=debug,stage=debug,tasks=debug,\ + executor=trace,forking::backend=trace,blockifier=off,jsonrpsee_server=off,hyper=off,\ + messaging=debug,node=error,explorer=info,rpc=trace,pool=trace,\ + katana_stage::downloader=trace,optimistic=debug,info"; let default_filter = EnvFilter::try_new(DEFAULT_LOG_FILTER); let filter = EnvFilter::try_from_default_env().or(default_filter)?;