diff --git a/xllm_service/common/types.h b/xllm_service/common/types.h index 89c681c..7accee0 100644 --- a/xllm_service/common/types.h +++ b/xllm_service/common/types.h @@ -112,6 +112,43 @@ struct LoadMetrics { bool empty() const { return false; } }; +// Record the latency monitoring metrics of the instance over the recent period +struct LatencyMetrics { + LatencyMetrics(const int64_t& recent_max_ttft, const int64_t& recent_max_tbt) + : recent_max_ttft(recent_max_ttft), recent_max_tbt(recent_max_tbt) {} + + // The unit is milliseconds. + int64_t recent_max_ttft; + int64_t recent_max_tbt; +}; + +enum class RequestAction : int32_t { + SCHEDULE = 0, + FINISH_PREFILL = 1, + FINISH_DECODE = 2, + CANCEL = 3, +}; + +// Record the request metrics of the instance +struct RequestMetrics { + RequestMetrics() + : prefill_request_num(0), + prefill_token_num(0), + decode_request_num(0), + decode_token_num(0), + estimated_prefill_time(0) {} + + int64_t prefill_request_num; + int64_t prefill_token_num; + + int64_t decode_request_num; + int64_t decode_token_num; + + // Estimated execution time for all prefill requests on the instance. + // The unit is milliseconds. + int64_t estimated_prefill_time; +}; + struct InstanceMetaInfo { public: InstanceMetaInfo() { set_init_timestamp(); } diff --git a/xllm_service/http_service/service.cpp b/xllm_service/http_service/service.cpp index 70934a6..8e80637 100644 --- a/xllm_service/http_service/service.cpp +++ b/xllm_service/http_service/service.cpp @@ -92,7 +92,12 @@ void handle_non_stream_response(brpc::Controller* cntl, template void handle_first_response(brpc::Controller* cntl, std::shared_ptr call_data, + Scheduler* scheduler, + std::string service_request_id, bool stream) { + // update request metrics for prefill finished request + scheduler->update_request_metrics_for_prefill(service_request_id); + std::unique_ptr cntl_guard(cntl); if (cntl->Failed()) { LOG(WARNING) << "Fail to send stream generation, " << cntl->ErrorText(); @@ -150,7 +155,6 @@ void XllmHttpServiceImpl::handle(std::shared_ptr call_data, LOG(ERROR) << "rpc service add new request error: " << request->service_request_id; call_data->finish_with_error("Internal runtime error."); - scheduler_->finish_request(request->service_request_id); return; } } @@ -177,14 +181,19 @@ void XllmHttpServiceImpl::handle(std::shared_ptr call_data, // 1. tokens will be received via rpc channel. // if (enable_decode_response_to_service_) { - google::protobuf::Closure* done = brpc::NewCallback( - &handle_first_response, redirect_cntl, call_data, request->stream); + google::protobuf::Closure* done = + brpc::NewCallback(&handle_first_response, + redirect_cntl, + call_data, + scheduler_, + request->service_request_id, + request->stream); channel_ptr->CallMethod(NULL, redirect_cntl, NULL, NULL, done); if (redirect_cntl->Failed()) { LOG(ERROR) << "Redirect to instance error: " << redirect_cntl->ErrorText(); call_data->finish_with_error(redirect_cntl->ErrorText()); - scheduler_->finish_request(request->service_request_id); + scheduler_->finish_request(request->service_request_id, /*error=*/true); delete done; delete redirect_cntl; return; diff --git a/xllm_service/proto/xllm_rpc_service.proto b/xllm_service/proto/xllm_rpc_service.proto index 023f7f9..c61389d 100644 --- a/xllm_service/proto/xllm_rpc_service.proto +++ b/xllm_service/proto/xllm_rpc_service.proto @@ -51,10 +51,16 @@ message LoadMetrics { float gpu_cache_usage_perc = 2; } +message LatencyMetrics { + int64 recent_max_ttft = 1; + int64 recent_max_tbt = 2; +} + message HeartbeatRequest { string name = 1; KvCacheEvent cache_event = 2; LoadMetrics load_metrics = 3; + LatencyMetrics latency_metrics = 4; } message InstanceID { diff --git a/xllm_service/request/request.h b/xllm_service/request/request.h index 2a925ca..3637059 100644 --- a/xllm_service/request/request.h +++ b/xllm_service/request/request.h @@ -47,6 +47,9 @@ struct Request { // instance routing Routing routing; + // the estimated TTFT obtained from the TTFT predictor + int64_t estimated_ttft = 0; + // output callback OutputCallback output_callback; diff --git a/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h b/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h index 98c1731..5eb9999 100644 --- a/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h +++ b/xllm_service/scheduler/loadbalance_policy/cache_aware_routing.h @@ -17,6 +17,7 @@ limitations under the License. #include "common/macros.h" #include "loadbalance_policy.h" +#include "scheduler/managers/global_kvcache_mgr.h" namespace xllm_service { @@ -24,13 +25,14 @@ class CacheAwareRouting final : public LoadBalancePolicy { public: CacheAwareRouting(std::shared_ptr instance_mgr, std::shared_ptr global_kvcache_mgr) - : LoadBalancePolicy(instance_mgr, global_kvcache_mgr) {}; + : global_kvcache_mgr_(global_kvcache_mgr), + LoadBalancePolicy(instance_mgr) {}; virtual ~CacheAwareRouting() = default; bool select_instances_pair(std::shared_ptr request) override; - protected: + private: DISALLOW_COPY_AND_ASSIGN(CacheAwareRouting); void cost_function( @@ -39,6 +41,8 @@ class CacheAwareRouting final : public LoadBalancePolicy { const std::unordered_map& load_metrics, const int64_t& max_waiting_requests_num, std::string* best_choice); + + std::shared_ptr global_kvcache_mgr_; }; } // namespace xllm_service diff --git a/xllm_service/scheduler/loadbalance_policy/loadbalance_policy.h b/xllm_service/scheduler/loadbalance_policy/loadbalance_policy.h index 3abab09..dc8d10f 100644 --- a/xllm_service/scheduler/loadbalance_policy/loadbalance_policy.h +++ b/xllm_service/scheduler/loadbalance_policy/loadbalance_policy.h @@ -15,18 +15,16 @@ limitations under the License. #pragma once -#include "../managers/global_kvcache_mgr.h" -#include "../managers/instance_mgr.h" #include "common/types.h" #include "request/request.h" +#include "scheduler/managers/instance_mgr.h" namespace xllm_service { class LoadBalancePolicy { public: - LoadBalancePolicy(std::shared_ptr instance_mgr, - std::shared_ptr global_kvcache_mgr) - : instance_mgr_(instance_mgr), global_kvcache_mgr_(global_kvcache_mgr) {} + LoadBalancePolicy(std::shared_ptr instance_mgr) + : instance_mgr_(instance_mgr) {} virtual ~LoadBalancePolicy() = default; @@ -34,8 +32,6 @@ class LoadBalancePolicy { protected: std::shared_ptr instance_mgr_; - - std::shared_ptr global_kvcache_mgr_; }; } // namespace xllm_service diff --git a/xllm_service/scheduler/loadbalance_policy/round_robin.h b/xllm_service/scheduler/loadbalance_policy/round_robin.h index 46571cd..bcbcc43 100644 --- a/xllm_service/scheduler/loadbalance_policy/round_robin.h +++ b/xllm_service/scheduler/loadbalance_policy/round_robin.h @@ -22,15 +22,14 @@ namespace xllm_service { class RoundRobin final : public LoadBalancePolicy { public: - RoundRobin(std::shared_ptr instance_mgr, - std::shared_ptr global_kvcache_mgr) - : LoadBalancePolicy(instance_mgr, global_kvcache_mgr) {}; + RoundRobin(std::shared_ptr instance_mgr) + : LoadBalancePolicy(instance_mgr) {}; virtual ~RoundRobin() = default; bool select_instances_pair(std::shared_ptr request) override; - protected: + private: DISALLOW_COPY_AND_ASSIGN(RoundRobin); }; diff --git a/xllm_service/scheduler/managers/CMakeLists.txt b/xllm_service/scheduler/managers/CMakeLists.txt index 79f5b49..e6a77f7 100644 --- a/xllm_service/scheduler/managers/CMakeLists.txt +++ b/xllm_service/scheduler/managers/CMakeLists.txt @@ -11,8 +11,10 @@ cc_library( instance_mgr.cpp global_kvcache_mgr.cpp DEPS + :chat_template :common :etcd_client + :request absl::random_random absl::strings glog::glog diff --git a/xllm_service/scheduler/managers/instance_mgr.cpp b/xllm_service/scheduler/managers/instance_mgr.cpp index 2b04c41..33c6194 100644 --- a/xllm_service/scheduler/managers/instance_mgr.cpp +++ b/xllm_service/scheduler/managers/instance_mgr.cpp @@ -66,10 +66,15 @@ void InstanceMgr::init() { for (auto& it : ETCD_KEYS_PREFIX_MAP) { etcd_client_->get_prefix(it.second, &instances_); } - // create ttft predictor for each instance - for (auto& pair : instances_) { - ttft_predictors_.insert_or_assign( - pair.first, TtftPredictor(pair.second.ttft_profiling_data)); + // create ttft predictor and request metrics for each instance + { + std::lock_guard ttft_predictor_lock(ttft_predictor_mutex_); + std::lock_guard request_metrics_lock(request_metrics_mutex_); + for (auto& pair : instances_) { + ttft_predictors_.insert_or_assign( + pair.first, TtftPredictor(pair.second.ttft_profiling_data)); + request_metrics_.insert_or_assign(pair.first, RequestMetrics()); + } } LOG(INFO) << "Load instance info from etcd:" << instances_.size(); std::vector channel_creat_fail_insts; @@ -99,7 +104,13 @@ void InstanceMgr::init() { } for (auto& name : channel_creat_fail_insts) { instances_.erase(name); - ttft_predictors_.erase(name); + { + std::lock_guard ttft_predictor_lock(ttft_predictor_mutex_); + std::lock_guard request_metrics_lock( + request_metrics_mutex_); + ttft_predictors_.erase(name); + request_metrics_.erase(name); + } } } { @@ -340,9 +351,18 @@ void InstanceMgr::update_instance_metainfo(const etcd::Response& response, continue; } - // create ttft predictor for instance - ttft_predictors_.emplace( - iter.first, TtftPredictor(iter.second.ttft_profiling_data)); + { + std::lock_guard ttft_predictor_lock( + ttft_predictor_mutex_); + std::lock_guard request_metrics_lock( + request_metrics_mutex_); + // create ttft predictor for instance + ttft_predictors_.emplace( + iter.first, TtftPredictor(iter.second.ttft_profiling_data)); + + // create request metrics for instance + request_metrics_.emplace(iter.first, RequestMetrics()); + } instances_.insert(std::make_pair(iter.first, std::move(iter.second))); @@ -395,8 +415,15 @@ void InstanceMgr::update_instance_metainfo(const etcd::Response& response, } instances_.erase(iter); - ttft_predictors_.erase(iter); cached_channels_.erase(iter); + { + std::lock_guard ttft_predictor_lock( + ttft_predictor_mutex_); + std::lock_guard request_metrics_lock( + request_metrics_mutex_); + ttft_predictors_.erase(iter); + request_metrics_.erase(iter); + } { std::lock_guard lock(update_mutex_); updated_metrics_.erase(iter); @@ -450,4 +477,74 @@ void InstanceMgr::update_load_metrics(const etcd::Response& response, }); } +void InstanceMgr::update_latency_metrics( + const std::string& instance_name, + const proto::LatencyMetrics& latency_metrics) { + std::lock_guard lock(latency_metrics_mutex_); + + latency_metrics_.insert_or_assign( + instance_name, + LatencyMetrics(latency_metrics.recent_max_ttft(), + latency_metrics.recent_max_tbt())); +} + +void InstanceMgr::update_request_metrics(std::shared_ptr request, + RequestAction action) { + std::lock_guard lock(request_metrics_mutex_); + + auto prefill_it = request_metrics_.find(request->routing.prefill_name); + if (prefill_it == request_metrics_.end()) { + LOG(ERROR) << "Failed to find instance request metrics, instance name : " + << request->routing.prefill_name; + return; + } + + auto decode_it = request_metrics_.find(request->routing.decode_name); + if (decode_it == request_metrics_.end()) { + LOG(ERROR) << "Failed to find instance request metrics, instance name : " + << request->routing.decode_name; + return; + } + + int64_t token_length = request->token_ids.size(); + switch (action) { + case RequestAction::SCHEDULE: + // update the request metrics for prefill and decode instances when + // request is scheduled + prefill_it->second.prefill_request_num += 1; + prefill_it->second.prefill_token_num += token_length; + prefill_it->second.estimated_prefill_time += request->estimated_ttft; + + decode_it->second.decode_request_num += 1; + decode_it->second.decode_token_num += token_length; + break; + case RequestAction::FINISH_PREFILL: + // only update the request metrics for prefill instance when request + // finishes the prefill phase + prefill_it->second.prefill_request_num -= 1; + prefill_it->second.prefill_token_num -= token_length; + prefill_it->second.estimated_prefill_time -= request->estimated_ttft; + break; + case RequestAction::FINISH_DECODE: + // update the request metrics for decode instance when request finishes + // the decode phase + decode_it->second.decode_request_num -= 1; + decode_it->second.decode_token_num -= token_length; + break; + case RequestAction::CANCEL: + // update the request metrics for prefill and decode instances when + // request is cancelled + prefill_it->second.prefill_request_num -= 1; + prefill_it->second.prefill_token_num -= token_length; + prefill_it->second.estimated_prefill_time -= request->estimated_ttft; + + decode_it->second.decode_request_num -= 1; + decode_it->second.decode_token_num -= token_length; + break; + default: + LOG(ERROR) << "Unknown RequestAction: " << static_cast(action); + break; + } +} + } // namespace xllm_service diff --git a/xllm_service/scheduler/managers/instance_mgr.h b/xllm_service/scheduler/managers/instance_mgr.h index 4915aac..9598cdc 100644 --- a/xllm_service/scheduler/managers/instance_mgr.h +++ b/xllm_service/scheduler/managers/instance_mgr.h @@ -27,6 +27,7 @@ limitations under the License. #include "common/threadpool.h" #include "common/ttft_predictor.h" #include "common/types.h" +#include "request/request.h" #include "scheduler/etcd_client/etcd_client.h" #include "xllm_rpc_service.pb.h" @@ -55,6 +56,14 @@ class InstanceMgr final { const proto::LoadMetrics& load_metrics); bool upload_load_metrics(); + // update the recent token latency metrics for the corresponding instance + void update_latency_metrics(const std::string& instance_name, + const proto::LatencyMetrics& latency_metrics); + + // update request metrics under different actions + void update_request_metrics(std::shared_ptr request, + RequestAction action); + void set_as_master(); private: @@ -81,7 +90,6 @@ class InstanceMgr final { std::shared_mutex inst_mutex_; std::unordered_map instances_; - std::unordered_map ttft_predictors_; std::vector prefill_index_; std::vector decode_index_; uint64_t next_prefill_index_ = 0; @@ -96,6 +104,21 @@ class InstanceMgr final { std::unordered_map updated_metrics_; std::unordered_set removed_instance_; + // "instance name" -> "TtftPredictor" map + std::mutex ttft_predictor_mutex_; + std::unordered_map ttft_predictors_; + + // Record the latest token latency metrics for each instance, including TTFT + // and TBT. + std::mutex latency_metrics_mutex_; + std::unordered_map latency_metrics_; + + // Record the request metrics for each instance, including prefill token + // count, prefill request count, estimated prefill execution time, decode + // token count, and decode request count. + std::mutex request_metrics_mutex_; + std::unordered_map request_metrics_; + ThreadPool threadpool_; }; diff --git a/xllm_service/scheduler/scheduler.cpp b/xllm_service/scheduler/scheduler.cpp index 59113a8..3f3a57c 100644 --- a/xllm_service/scheduler/scheduler.cpp +++ b/xllm_service/scheduler/scheduler.cpp @@ -47,8 +47,7 @@ Scheduler::Scheduler(const Options& options) : options_(options) { lb_policy_ = std::make_unique(instance_mgr_, global_kvcache_mgr_); } else { - lb_policy_ = - std::make_unique(instance_mgr_, global_kvcache_mgr_); + lb_policy_ = std::make_unique(instance_mgr_); } if (is_master_service_) { @@ -92,6 +91,11 @@ bool Scheduler::schedule(std::shared_ptr request) { auto ret = lb_policy_->select_instances_pair(request); DLOG(INFO) << request->routing.debug_string(); + // update request metrics + if (request->prompt.size() != 0) { + instance_mgr_->update_request_metrics(request, RequestAction::SCHEDULE); + } + return ret; } @@ -116,6 +120,7 @@ void Scheduler::handle_instance_heartbeat(const proto::HeartbeatRequest* req) { } global_kvcache_mgr_->record_updated_kvcaches(req->name(), req->cache_event()); instance_mgr_->record_load_metrics_update(req->name(), req->load_metrics()); + instance_mgr_->update_latency_metrics(req->name(), req->latency_metrics()); } void Scheduler::handle_master_service_watch(const etcd::Response& response, @@ -259,10 +264,23 @@ bool Scheduler::record_new_request( return true; } -void Scheduler::finish_request(const std::string& service_request_id) { +void Scheduler::finish_request(const std::string& service_request_id, + bool error) { { std::lock_guard guard(request_mutex_); - requests_.erase(service_request_id); + auto it = requests_.find(service_request_id); + if (it != requests_.end()) { + // update instance request metrics for finished request + if (error) { + instance_mgr_->update_request_metrics(it->second, + RequestAction::CANCEL); + } else { + instance_mgr_->update_request_metrics(it->second, + RequestAction::FINISH_DECODE); + } + + requests_.erase(it); + } } { @@ -312,4 +330,15 @@ bool Scheduler::handle_generation(const llm::RequestOutput& request_output) { return true; } +void Scheduler::update_request_metrics_for_prefill( + const std::string& service_request_id) { + std::lock_guard guard(request_mutex_); + auto it = requests_.find(service_request_id); + if (it != requests_.end()) { + // update instance request metrics for prefill finished request + instance_mgr_->update_request_metrics(it->second, + RequestAction::FINISH_PREFILL); + } +} + } // namespace xllm_service \ No newline at end of file diff --git a/xllm_service/scheduler/scheduler.h b/xllm_service/scheduler/scheduler.h index fe8b5e8..b999fc8 100644 --- a/xllm_service/scheduler/scheduler.h +++ b/xllm_service/scheduler/scheduler.h @@ -57,11 +57,16 @@ class Scheduler final { std::shared_ptr request); bool record_new_request(std::shared_ptr call_data, std::shared_ptr request); - void finish_request(const std::string& service_request_id); + void finish_request(const std::string& service_request_id, + bool error = false); // handle generations from prefill/decode instance bool handle_generation(const llm::RequestOutput& request_output); + // update request metrics for prefill finished request + void update_request_metrics_for_prefill( + const std::string& service_request_id); + private: DISALLOW_COPY_AND_ASSIGN(Scheduler);