Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ http = "0.2"

# Async runtime
tokio.workspace = true
tokio-stream = "0.1"

# Logging
tracing.workspace = true
Expand All @@ -44,6 +45,7 @@ tower-service.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
hex = "0.4"
katana-utils = { workspace = true, features = ["node"] }

[build-dependencies]
tonic-build.workspace = true
22 changes: 18 additions & 4 deletions crates/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::server::Routes;
use tonic::transport::Server;
use tracing::{error, info};
Expand All @@ -28,6 +30,10 @@ pub enum Error {
/// Server has already been stopped.
#[error("gRPC server has already been stopped")]
AlreadyStopped,

/// IO error during socket binding.
#[error("Failed to bind socket: {0}")]
Bind(#[from] std::io::Error),
}

/// Handle to a running gRPC server.
Expand Down Expand Up @@ -104,6 +110,9 @@ impl GrpcServer {
///
/// This method spawns the server on a new Tokio task and returns a handle
/// that can be used to manage the server.
///
/// If the port in `addr` is 0, the OS will assign an available port.
/// The actual bound address can be retrieved from the returned handle.
pub async fn start(&self, addr: SocketAddr) -> Result<GrpcServerHandle, Error> {
// Build reflection service for tooling support (grpcurl, Postman, etc.)
let reflection_service = tonic_reflection::server::Builder::configure()
Expand All @@ -114,11 +123,16 @@ impl GrpcServer {
// Create shutdown channel
let (shutdown_tx, mut shutdown_rx) = watch::channel(());

// Bind to the socket first to get the actual address (important when port is 0)
let listener = TcpListener::bind(addr).await?;
let actual_addr = listener.local_addr()?;
let incoming = TcpListenerStream::new(listener);

let mut builder = Server::builder().timeout(self.timeout);
let server = builder.add_routes(self.routes.clone()).add_service(reflection_service);

// Start the server with graceful shutdown
let server_future = server.serve_with_shutdown(addr, async move {
// Start the server with graceful shutdown using the pre-bound listener
let server_future = server.serve_with_incoming_shutdown(incoming, async move {
let _ = shutdown_rx.changed().await;
});

Expand All @@ -128,9 +142,9 @@ impl GrpcServer {
}
});

info!(target: "grpc", %addr, "gRPC server started.");
info!(target: "grpc", addr = %actual_addr, "gRPC server started.");

Ok(GrpcServerHandle { addr, shutdown_tx: Arc::new(shutdown_tx) })
Ok(GrpcServerHandle { addr: actual_addr, shutdown_tx: Arc::new(shutdown_tx) })
}
}

Expand Down
Loading
Loading