Skip to content

[ENH]: fix high latency & response errors of frontend -> query service calls during rollout #5316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/pkg/memberlist_manager/node_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (w *KubernetesWatcher) ListReadyMembers() (Memberlist, error) {
for _, condition := range pod.Status.Conditions {
if condition.Type == v1.PodReady {
if condition.Status == v1.ConditionTrue {
if pod.DeletionTimestamp != nil {
// Pod is being deleted, don't include it in the member list
continue
}
memberlist = append(memberlist, Member{pod.Name, pod.Status.PodIP, pod.Spec.NodeName})
}
break
Expand Down
18 changes: 18 additions & 0 deletions rust/config/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use serde::Serialize;
use std::time::Duration;

pub fn deserialize_duration_from_seconds<'de, D>(d: D) -> Result<Duration, D::Error>
where
D: serde::Deserializer<'de>,
{
let secs: u64 = serde::Deserialize::deserialize(d)?;
Ok(Duration::from_secs(secs))
}

pub fn serialize_duration_to_seconds<S>(duration: &Duration, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let secs = duration.as_secs();
secs.serialize(s)
}
1 change: 1 addition & 0 deletions rust/config/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod assignment;
pub mod helpers;
pub mod registry;

use async_trait::async_trait;
Expand Down
9 changes: 1 addition & 8 deletions rust/garbage_collector/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chroma_cache::CacheConfig;
use chroma_config::helpers::deserialize_duration_from_seconds;
use chroma_log::config::LogConfig;
use chroma_storage::config::StorageConfig;
use chroma_system::DispatcherConfig;
Expand All @@ -14,14 +15,6 @@ use crate::types::CleanupMode;

const DEFAULT_CONFIG_PATH: &str = "./garbage_collector_config.yaml";

fn deserialize_duration_from_seconds<'de, D>(d: D) -> Result<Duration, D::Error>
where
D: serde::Deserializer<'de>,
{
let secs: u64 = serde::Deserialize::deserialize(d)?;
Ok(Duration::from_secs(secs))
}

#[derive(Debug, serde::Deserialize, Clone, Default)]
pub struct GarbageCollectorConfig {
pub(super) service_name: String,
Expand Down
19 changes: 18 additions & 1 deletion rust/log-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::{Duration, Instant, SystemTime};

use bytes::Bytes;
use chroma_cache::CacheConfig;
use chroma_config::helpers::{deserialize_duration_from_seconds, serialize_duration_to_seconds};
use chroma_config::Configurable;
use chroma_error::ChromaError;
use chroma_log::{config::GrpcLogConfig, grpc_log::GrpcLog};
Expand Down Expand Up @@ -2332,6 +2333,7 @@ impl LogServerWrapper {

let max_encoding_message_size = log_server.config.max_encoding_message_size;
let max_decoding_message_size = log_server.config.max_decoding_message_size;
let shutdown_grace_period = log_server.config.grpc_shutdown_grace_period;

let wrapper = LogServerWrapper {
log_server: Arc::new(log_server),
Expand All @@ -2354,7 +2356,10 @@ impl LogServerWrapper {
}
};
sigterm.recv().await;
tracing::info!("Received SIGTERM, shutting down");
tracing::info!("Received SIGTERM, waiting for grace period...");
// Note: gRPC calls can still be successfully made during this period. We rely on the memberlist updating to stop clients from sending new requests. Ideally there would be a Tower layer that rejected new requests during this period with UNAVAILABLE or similar.
tokio::time::sleep(shutdown_grace_period).await;
tracing::info!("Grace period ended, shutting down server...");
});

let res = server.await;
Expand Down Expand Up @@ -2503,6 +2508,13 @@ pub struct LogServerConfig {
pub max_encoding_message_size: usize,
#[serde(default = "LogServerConfig::default_max_decoding_message_size")]
pub max_decoding_message_size: usize,
#[serde(
rename = "grpc_shutdown_grace_period_seconds",
deserialize_with = "deserialize_duration_from_seconds",
serialize_with = "serialize_duration_to_seconds",
default = "LogServerConfig::default_grpc_shutdown_grace_period"
)]
pub grpc_shutdown_grace_period: Duration,
}

impl LogServerConfig {
Expand Down Expand Up @@ -2542,6 +2554,10 @@ impl LogServerConfig {
fn default_max_decoding_message_size() -> usize {
32_000_000
}

fn default_grpc_shutdown_grace_period() -> Duration {
Duration::from_secs(1)
}
}

impl Default for LogServerConfig {
Expand All @@ -2562,6 +2578,7 @@ impl Default for LogServerConfig {
proxy_to: None,
max_encoding_message_size: Self::default_max_encoding_message_size(),
max_decoding_message_size: Self::default_max_decoding_message_size(),
grpc_shutdown_grace_period: Self::default_grpc_shutdown_grace_period(),
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use chroma_config::assignment;
use chroma_config::helpers::deserialize_duration_from_seconds;
use chroma_index::config::SpannProviderConfig;
use chroma_sysdb::SysDbConfig;
use chroma_tracing::{OtelFilter, OtelFilterLevel};
use figment::providers::{Env, Format, Yaml};
use serde::{Deserialize, Serialize};
use std::time::Duration;

const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml";

Expand Down Expand Up @@ -143,6 +145,12 @@ pub struct QueryServiceConfig {
pub spann_provider: SpannProviderConfig,
#[serde(default)]
pub jemalloc_pprof_server_port: Option<u16>,
#[serde(
rename = "grpc_shutdown_grace_period_seconds",
deserialize_with = "deserialize_duration_from_seconds",
default = "QueryServiceConfig::default_grpc_shutdown_grace_period"
)]
pub grpc_shutdown_grace_period: Duration,
}

impl QueryServiceConfig {
Expand Down Expand Up @@ -172,6 +180,10 @@ impl QueryServiceConfig {
fn default_fetch_log_batch_size() -> u32 {
100
}

fn default_grpc_shutdown_grace_period() -> Duration {
Duration::from_secs(1)
}
}

#[derive(Clone, Debug, Default, Deserialize, Serialize)]
Expand Down
34 changes: 10 additions & 24 deletions rust/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,31 +61,17 @@ pub async fn query_service_entrypoint() {
};
worker_server.set_dispatcher(dispatcher_handle.clone());

let server_join_handle = tokio::spawn(async move {
// Server task will run until it receives a shutdown signal
let _ = tokio::spawn(async move {
let _ = crate::server::WorkerServer::run(worker_server).await;
});

let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sigterm) => sigterm,
Err(e) => {
println!("Failed to create signal handler: {:?}", e);
return;
}
};

println!("Waiting for SIGTERM to stop the server");
select! {
// Kubernetes will send SIGTERM to stop the pod gracefully
// TODO: add more signal handling
_ = sigterm.recv() => {
dispatcher_handle.stop();
let _ = dispatcher_handle.join().await;
system.stop().await;
system.join().await;
let _ = server_join_handle.await;
},
};
println!("Server stopped");
})
.await;

println!("Shutting down the query service...");
dispatcher_handle.stop();
let _ = dispatcher_handle.join().await;
system.stop().await;
system.join().await;
}

pub async fn compaction_service_entrypoint() {
Expand Down
11 changes: 10 additions & 1 deletion rust/worker/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use async_trait::async_trait;
use chroma_blockstore::provider::BlockfileProvider;
use chroma_config::{registry::Registry, Configurable};
Expand Down Expand Up @@ -50,6 +52,7 @@ pub struct WorkerServer {
jemalloc_pprof_server_port: Option<u16>,
// config
fetch_log_batch_size: u32,
shutdown_grace_period: Duration,
}

#[async_trait]
Expand Down Expand Up @@ -92,6 +95,7 @@ impl Configurable<(QueryServiceConfig, System)> for WorkerServer {
port: config.my_port,
jemalloc_pprof_server_port: config.jemalloc_pprof_server_port,
fetch_log_batch_size: config.fetch_log_batch_size,
shutdown_grace_period: config.grpc_shutdown_grace_period,
})
}
}
Expand Down Expand Up @@ -121,6 +125,7 @@ impl WorkerServer {
chroma_types::chroma_proto::debug_server::DebugServer::new(worker.clone()),
);

let shutdown_grace_period = worker.shutdown_grace_period;
let server = server.serve_with_shutdown(addr, async {
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(sigterm) => sigterm,
Expand All @@ -130,7 +135,10 @@ impl WorkerServer {
}
};
sigterm.recv().await;
tracing::info!("Received SIGTERM, shutting down");
tracing::info!("Received SIGTERM, waiting for grace period...");
// Note: gRPC calls can still be successfully made during this period. We rely on the memberlist updating to stop clients from sending new requests. Ideally there would be a Tower layer that rejected new requests during this period with UNAVAILABLE or similar.
tokio::time::sleep(shutdown_grace_period).await;
tracing::info!("Grace period ended, shutting down server...");
});

tokio::spawn(async move {
Expand Down Expand Up @@ -516,6 +524,7 @@ mod tests {
port,
jemalloc_pprof_server_port: None,
fetch_log_batch_size: 100,
shutdown_grace_period: Duration::from_secs(1),
};

let dispatcher = Dispatcher::new(DispatcherConfig {
Expand Down
Loading