diff --git a/Cargo.lock b/Cargo.lock index 24fbbb5..99d5072 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -527,6 +527,8 @@ dependencies = [ "config", "lazy_static", "prometheus", + "r2d2", + "r2d2_kafka", "rdkafka", "serde 1.0.125", "slog", @@ -901,6 +903,25 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_kafka" +version = "0.1.0" +dependencies = [ + "r2d2", + "rdkafka", +] + [[package]] name = "rand" version = "0.7.3" @@ -1081,6 +1102,15 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" +[[package]] +name = "scheduled-thread-pool" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7" +dependencies = [ + "parking_lot", +] + [[package]] name = "scoped-tls" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index d748513..8b571fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "kprf" version = "0.1.0" -authors = ["Aleksandr Petrukhin "] +authors = [ + "Aleksandr Petrukhin ", + "Artyom Belyankin ", +] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -21,7 +24,12 @@ slog-term = "2.8.0" prometheus = "0.12.0" lazy_static = "1.4.0" uuid = { version = "0.8", features = ["serde", "v4"] } +r2d2 = "0.8.9" +r2d2_kafka = { path = "r2d2_kafka" } [features] default = ["std"] -std = [ "serde" ] \ No newline at end of file +std = [ "serde" ] + +[workspace] +members = ["r2d2_kafka"] \ No newline at end of file diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 diff --git a/r2d2_kafka/Cargo.toml b/r2d2_kafka/Cargo.toml new file mode 100644 index 0000000..2e04ac7 --- /dev/null +++ b/r2d2_kafka/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "r2d2_kafka" +version = "0.1.0" +description = "Apache Kafka support for the r2d2 connection pool" +keywords = ["kafka", "apache-kafka", "apache", "pool", "r2d2"] +authors = ["Artyom Belyankin "] +edition = "2018" + +[dependencies] +r2d2 = "0.8.9" +rdkafka = { version = "0.25", features = ["cmake-build"] } \ No newline at end of file diff --git a/r2d2_kafka/src/lib.rs b/r2d2_kafka/src/lib.rs new file mode 100644 index 0000000..d7d3681 --- /dev/null +++ b/r2d2_kafka/src/lib.rs @@ -0,0 +1,46 @@ +use rdkafka::config::FromClientConfig; +use rdkafka::error::{KafkaError, KafkaResult}; +use rdkafka::producer::{FutureProducer, Producer}; +use rdkafka::ClientConfig; +use std::time::Duration; + +const DEFAULT_VALIDATION_DURATION: u64 = 100; + +#[derive(Clone, Debug)] +pub struct KafkaConnectorManager { + config: ClientConfig, + validation_duration: Duration, +} + +impl KafkaConnectorManager { + pub fn new(config: ClientConfig) -> KafkaConnectorManager { + let validation_duration = Duration::from_millis(DEFAULT_VALIDATION_DURATION); + KafkaConnectorManager { + config, + validation_duration, + } + } +} + +impl r2d2::ManageConnection for KafkaConnectorManager { + type Connection = FutureProducer; + type Error = KafkaError; + + fn connect(&self) -> KafkaResult { + FutureProducer::from_config(&self.config.clone()) + } + + fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + match conn + .client() + .fetch_metadata(Some(""), self.validation_duration) + { + Ok(_) => Ok(()), + Err(e) => Err(e), + } + } + + fn has_broken(&self, conn: &mut Self::Connection) -> bool { + self.is_valid(conn).is_err() + } +} diff --git a/src/http/handlers.rs b/src/http/handlers.rs index d11e518..772e161 100644 --- a/src/http/handlers.rs +++ b/src/http/handlers.rs @@ -54,7 +54,6 @@ mod handler { use crate::kafka::kafka::producer::Producer; use crate::log::kflog; use rdkafka::error::KafkaError; - use rdkafka::producer::future_producer::OwnedDeliveryResult; use std::convert::Infallible; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -95,7 +94,7 @@ mod handler { &record.topic, &record.data, record.key.as_ref(), - Duration::from_millis(100), + Duration::from_millis(300), ) .await }) diff --git a/src/kafka/kafka.rs b/src/kafka/kafka.rs index 885ce16..87fb3f5 100644 --- a/src/kafka/kafka.rs +++ b/src/kafka/kafka.rs @@ -201,15 +201,19 @@ pub mod config { } pub mod producer { - use rdkafka::client::DefaultClientContext; - use rdkafka::config::FromClientConfig; + use r2d2::Pool; + use r2d2_kafka::KafkaConnectorManager; use rdkafka::producer::future_producer::OwnedDeliveryResult; - use rdkafka::producer::{FutureProducer, FutureRecord}; + use rdkafka::producer::FutureRecord; use std::sync::Arc; use std::time::{Duration, SystemTime}; + const DEFAULT_PRODUCERS_POOL_SIZE: u32 = 15; + const DEFAULT_PRODUCERS_CONNECTION_TIMEOUT: Duration = Duration::from_millis(100); + const DEFAULT_PRODUCERS_CONNECTION_LIFETIME: Option = Some(Duration::from_secs(300)); + pub struct Producer { - producer: FutureProducer, + pool: Arc>, sent_messages_counter: prometheus::IntCounterVec, queue_size_gauge: prometheus::IntGaugeVec, error_counter: prometheus::IntCounterVec, @@ -238,7 +242,20 @@ pub mod producer { }; let start = SystemTime::now(); - let result = self.producer.send(record, timeout).await; + let conn = self + .pool + .clone() + .get() + .map_err(|err| { + println!( + "kafka: get producer connection from pool error in line:{} ! error: {:?}", + line!(), + err + ) + }) + .unwrap(); + + let result = conn.send(record, timeout).await; self.message_send_duration .with_label_values(&[&topic]) .observe( @@ -260,43 +277,48 @@ pub mod producer { } pub fn new(cfg: super::config::Config) -> Arc { - let cf = cfg.to_hash(); let mut client_config = rdkafka::ClientConfig::new(); - for (k, v) in cf.iter() { + for (k, v) in cfg.to_hash().iter() { client_config.set(k, v); } - let result = FutureProducer::from_config(&client_config); - match result { - Err(err) => panic!("Failed to create threaded producer: {}", err.to_string()), - Ok(producer) => Arc::new(Producer { - producer, - queue_size_gauge: prometheus::register_int_gauge_vec!( - "kafka_internal_queue_size", - "Kafka internal queue size", - &["topic"] - ) - .unwrap(), - error_counter: prometheus::register_int_counter_vec!( - "kafka_errors_count", - "Kafka internal errors count", - &["topic", "error_code"] - ) + let manager = KafkaConnectorManager::new(client_config); + let pool = Arc::new( + r2d2::Pool::builder() + .max_size(DEFAULT_PRODUCERS_POOL_SIZE) + .connection_timeout(DEFAULT_PRODUCERS_CONNECTION_TIMEOUT) + .max_lifetime(DEFAULT_PRODUCERS_CONNECTION_LIFETIME) + .build(manager) .unwrap(), - sent_messages_counter: prometheus::register_int_counter_vec!( - "kafka_sent_messages", - "Kafka sent messages count", - &["topic"] - ) - .unwrap(), - message_send_duration: prometheus::register_histogram_vec!( - "kafka_message_send_duration", - "Kafka message send duration", - &["topic"], - prometheus::exponential_buckets(5.0, 2.0, 5).unwrap() - ) - .unwrap(), - }), - } + ); + + Arc::new(Producer { + pool, + queue_size_gauge: prometheus::register_int_gauge_vec!( + "kafka_internal_queue_size", + "Kafka internal queue size", + &["topic"] + ) + .unwrap(), + error_counter: prometheus::register_int_counter_vec!( + "kafka_errors_count", + "Kafka internal errors count", + &["topic", "error_code"] + ) + .unwrap(), + sent_messages_counter: prometheus::register_int_counter_vec!( + "kafka_sent_messages", + "Kafka sent messages count", + &["topic"] + ) + .unwrap(), + message_send_duration: prometheus::register_histogram_vec!( + "kafka_message_send_duration", + "Kafka message send duration", + &["topic"], + prometheus::exponential_buckets(5.0, 2.0, 5).unwrap() + ) + .unwrap(), + }) } } diff --git a/src/main.rs b/src/main.rs index a4ad516..3553f01 100644 --- a/src/main.rs +++ b/src/main.rs @@ -70,6 +70,8 @@ async fn main() { let metrics_shutdown_rx = start_metrics_server(logger.clone(), shutdown_metrics_rx); + println!("Running http server"); + // TODO(shmel1k): improve graceful shutdown behavior. let main_server_shutdown_rx = server.start_server(logger.clone(), kafka_producer.clone(), shutdown_rx);