Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 77 additions & 38 deletions lading/src/generator/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! Additional metrics may be emitted by this generator's [throttle].
//!

use std::{convert::TryFrom, num::NonZeroU32, time::Duration};
use std::{convert::TryFrom, num::NonZeroU16, num::NonZeroU32, sync::Arc, time::Duration};

use byte_unit::Byte;
use bytes::{Buf, BufMut, Bytes};
Expand All @@ -22,9 +22,11 @@
uri::{self, PathAndQuery},
};
use metrics::counter;
use once_cell::sync::OnceCell;
use rand::SeedableRng;
use rand::rngs::StdRng;
use serde::{Deserialize, Serialize};
use tokio::sync::Semaphore;
use tonic::{
Request, Response, Status, client,
codec::{DecodeBuf, Decoder, EncodeBuf, Encoder},
Expand All @@ -36,9 +38,12 @@

use super::General;
use crate::generator::common::{
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle,
BlockThrottle, ConcurrencyStrategy, MetricsBuilder, ThrottleConfig, ThrottleConversionError,
create_throttle,
};

static CONNECTION_SEMAPHORE: OnceCell<Semaphore> = OnceCell::new();

/// Errors produced by [`Grpc`]
#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -171,12 +176,12 @@
/// This generator is able to connect to targets via gRPC.
#[derive(Debug)]
pub struct Grpc {
config: Config,
target_uri: Uri,
rpc_path: PathAndQuery,
concurrency: ConcurrencyStrategy,
shutdown: lading_signal::Watcher,
throttle: BlockThrottle,
block_cache: block::Cache,
block_cache: Arc<block::Cache>,
metric_labels: Vec<(String, String)>,
}

Expand Down Expand Up @@ -223,18 +228,25 @@
)?,
};

let concurrency =
ConcurrencyStrategy::new(NonZeroU16::new(config.parallel_connections), false);

CONNECTION_SEMAPHORE
.set(Semaphore::new(concurrency.connection_count() as usize))

Check warning on line 235 in lading/src/generator/grpc.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (ubuntu-latest, fmt)

Diff in /home/runner/work/lading/lading/lading/src/generator/grpc.rs

Check warning on line 235 in lading/src/generator/grpc.rs

View workflow job for this annotation

GitHub Actions / Rust Actions (Check/Fmt/Clippy) (macos-latest, fmt)

Diff in /Users/runner/work/lading/lading/lading/src/generator/grpc.rs
.expect("failed to set semaphore");

let target_uri =
uri::Uri::try_from(config.target_uri.clone()).expect("target_uri must be valid");
uri::Uri::try_from(config.target_uri).expect("target_uri must be valid");
let rpc_path = target_uri
.path_and_query()
.cloned()
.expect("target_uri should have an RPC path");
Ok(Self {
target_uri,
rpc_path,
config,
concurrency,
shutdown,
block_cache,
block_cache: Arc::new(block_cache),
throttle,
metric_labels: labels,
})
Expand All @@ -247,7 +259,6 @@
let uri = Uri::from_parts(parts).expect("failed to convert parts into uri");

let endpoint = transport::Endpoint::new(uri)?;
let endpoint = endpoint.concurrency_limit(self.config.parallel_connections as usize);
let endpoint = endpoint.connect_timeout(Duration::from_secs(1));
let conn = endpoint.connect().await?;
let conn = client::Grpc::new(conn);
Expand Down Expand Up @@ -284,7 +295,7 @@
///
/// Function will panic if underlying byte capacity is not available.
pub async fn spin(mut self) -> Result<(), Error> {
let mut client = loop {
let client = loop {
match self.connect().await {
Ok(c) => break c,
Err(source) => {
Expand All @@ -300,6 +311,7 @@

let mut handle = self.block_cache.handle();
let rpc_path = self.rpc_path;
let labels = self.metric_labels;

let shutdown_wait = self.shutdown.recv();
tokio::pin!(shutdown_wait);
Expand All @@ -308,43 +320,70 @@
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
let _ = result;
let block = self.block_cache.advance(&mut handle);
let block_bytes = Bytes::copy_from_slice(&block.bytes);
let block_length = block.bytes.len();
counter!("requests_sent", &self.metric_labels).increment(1);
let res = Self::req(
&mut client,
rpc_path.clone(),
Bytes::copy_from_slice(&block.bytes),
)
.await;

match res {
Ok(res) => {
counter!("bytes_written", &self.metric_labels).increment(block_length as u64);
if let Some(data_points) = block.metadata.data_points {
counter!("data_points_transmitted", &self.metric_labels).increment(data_points);
let data_points = block.metadata.data_points;

let mut task_client = client.clone();
let task_rpc_path = rpc_path.clone();
let task_labels = labels.clone();
let target_uri = self.target_uri.clone();

let permit = CONNECTION_SEMAPHORE
.get()
.expect("connection semaphore not initialized")
.acquire()
.await
.expect("connection semaphore closed");
tokio::spawn(async move {
counter!("requests_sent", &task_labels).increment(1);
let res = Self::req(
&mut task_client,
task_rpc_path.clone(),
block_bytes,
)
.await;

match res {
Ok(res) => {
counter!("bytes_written", &task_labels)
.increment(block_length as u64);
if let Some(dp) = data_points {
counter!("data_points_transmitted", &task_labels)
.increment(dp);
}
counter!("request_ok", &task_labels).increment(1);
counter!("response_bytes", &task_labels)
.increment(res.into_inner() as u64);
}
Err(source) => {
error!(
"Failed to make RPC request to {endpoint}{path}: {source}",
endpoint = target_uri,
path = task_rpc_path
);
let mut error_labels = task_labels.clone();
error_labels
.push(("error".to_string(), source.to_string()));
counter!("request_failure", &error_labels).increment(1);
}
counter!("request_ok", &self.metric_labels).increment(1);
counter!("response_bytes", &self.metric_labels).increment(res.into_inner() as u64);
}
Err(source) => {
error!(
"Failed to make RPC request to {endpoint}{path}: {source}",
endpoint = self.target_uri,
path = rpc_path
);
let mut error_labels = self.metric_labels.clone();
error_labels.push(("error".to_string(), source.to_string()));
counter!("request_failure", &error_labels).increment(1);
}
}
drop(permit);
});
},
() = &mut shutdown_wait => {
info!("shutdown signal received");
break;
// Acquire all permits to ensure in-flight tasks complete
// before returning.
let _semaphore = CONNECTION_SEMAPHORE
.get()
.expect("connection semaphore not initialized")
.acquire_many(u32::from(self.concurrency.connection_count()))
.await
.expect("connection semaphore closed");
return Ok(());
},
}
}

Ok(())
}
}
Loading