Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions xllm_service/common/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,43 @@ struct LoadMetrics {
bool empty() const { return false; }
};

// Record the latency monitoring metrics of the instance over the recent period
struct LatencyMetrics {
LatencyMetrics(const int64_t& recent_max_ttft, const int64_t& recent_max_tbt)
: recent_max_ttft(recent_max_ttft), recent_max_tbt(recent_max_tbt) {}

// The unit is milliseconds.
int64_t recent_max_ttft;
int64_t recent_max_tbt;
};

enum class RequestAction : int32_t {
SCHEDULE = 0,
FINISH_PREFILL = 1,
FINISH_DECODE = 2,
CANCEL = 3,
};

// Record the request metrics of the instance
struct RequestMetrics {
RequestMetrics()
: prefill_request_num(0),
prefill_token_num(0),
decode_request_num(0),
decode_token_num(0),
estimated_prefill_time(0) {}

int64_t prefill_request_num;
int64_t prefill_token_num;

int64_t decode_request_num;
int64_t decode_token_num;

// Estimated execution time for all prefill requests on the instance.
// The unit is milliseconds.
int64_t estimated_prefill_time;
};

struct InstanceMetaInfo {
public:
InstanceMetaInfo() { set_init_timestamp(); }
Expand Down
17 changes: 13 additions & 4 deletions xllm_service/http_service/service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ void handle_non_stream_response(brpc::Controller* cntl,
template <typename T>
void handle_first_response(brpc::Controller* cntl,
std::shared_ptr<T> call_data,
Scheduler* scheduler,
std::string service_request_id,
bool stream) {
// update request metrics for prefill finished request
scheduler->update_request_metrics_for_prefill(service_request_id);

std::unique_ptr<brpc::Controller> cntl_guard(cntl);
if (cntl->Failed()) {
LOG(WARNING) << "Fail to send stream generation, " << cntl->ErrorText();
Expand Down Expand Up @@ -150,7 +155,6 @@ void XllmHttpServiceImpl::handle(std::shared_ptr<T> call_data,
LOG(ERROR) << "rpc service add new request error: "
<< request->service_request_id;
call_data->finish_with_error("Internal runtime error.");
scheduler_->finish_request(request->service_request_id);
return;
}
}
Expand All @@ -177,14 +181,19 @@ void XllmHttpServiceImpl::handle(std::shared_ptr<T> call_data,
// 1. tokens will be received via rpc channel.
//
if (enable_decode_response_to_service_) {
google::protobuf::Closure* done = brpc::NewCallback(
&handle_first_response<T>, redirect_cntl, call_data, request->stream);
google::protobuf::Closure* done =
brpc::NewCallback(&handle_first_response<T>,
redirect_cntl,
call_data,
scheduler_,
request->service_request_id,
request->stream);
channel_ptr->CallMethod(NULL, redirect_cntl, NULL, NULL, done);
if (redirect_cntl->Failed()) {
LOG(ERROR) << "Redirect to instance error: "
<< redirect_cntl->ErrorText();
call_data->finish_with_error(redirect_cntl->ErrorText());
scheduler_->finish_request(request->service_request_id);
scheduler_->finish_request(request->service_request_id, /*error=*/true);
delete done;
delete redirect_cntl;
return;
Expand Down
6 changes: 6 additions & 0 deletions xllm_service/proto/xllm_rpc_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ message LoadMetrics {
float gpu_cache_usage_perc = 2;
}

message LatencyMetrics {
int64 recent_max_ttft = 1;
int64 recent_max_tbt = 2;
}

message HeartbeatRequest {
string name = 1;
KvCacheEvent cache_event = 2;
LoadMetrics load_metrics = 3;
LatencyMetrics latency_metrics = 4;
}

message InstanceID {
Expand Down
3 changes: 3 additions & 0 deletions xllm_service/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ struct Request {
// instance routing
Routing routing;

// the estimated TTFT obtained from the TTFT predictor
int64_t estimated_ttft = 0;

// output callback
OutputCallback output_callback;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ limitations under the License.

#include "common/macros.h"
#include "loadbalance_policy.h"
#include "scheduler/managers/global_kvcache_mgr.h"

namespace xllm_service {

class CacheAwareRouting final : public LoadBalancePolicy {
public:
CacheAwareRouting(std::shared_ptr<InstanceMgr> instance_mgr,
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
: LoadBalancePolicy(instance_mgr, global_kvcache_mgr) {};
: global_kvcache_mgr_(global_kvcache_mgr),
LoadBalancePolicy(instance_mgr) {};

virtual ~CacheAwareRouting() = default;

bool select_instances_pair(std::shared_ptr<Request> request) override;

protected:
private:
DISALLOW_COPY_AND_ASSIGN(CacheAwareRouting);

void cost_function(
Expand All @@ -39,6 +41,8 @@ class CacheAwareRouting final : public LoadBalancePolicy {
const std::unordered_map<std::string, LoadMetrics>& load_metrics,
const int64_t& max_waiting_requests_num,
std::string* best_choice);

std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr_;
};

} // namespace xllm_service
10 changes: 3 additions & 7 deletions xllm_service/scheduler/loadbalance_policy/loadbalance_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,23 @@ limitations under the License.

#pragma once

#include "../managers/global_kvcache_mgr.h"
#include "../managers/instance_mgr.h"
#include "common/types.h"
#include "request/request.h"
#include "scheduler/managers/instance_mgr.h"

namespace xllm_service {

class LoadBalancePolicy {
public:
LoadBalancePolicy(std::shared_ptr<InstanceMgr> instance_mgr,
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
: instance_mgr_(instance_mgr), global_kvcache_mgr_(global_kvcache_mgr) {}
LoadBalancePolicy(std::shared_ptr<InstanceMgr> instance_mgr)
: instance_mgr_(instance_mgr) {}

virtual ~LoadBalancePolicy() = default;

virtual bool select_instances_pair(std::shared_ptr<Request> request) = 0;

protected:
std::shared_ptr<InstanceMgr> instance_mgr_;

std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr_;
};

} // namespace xllm_service
7 changes: 3 additions & 4 deletions xllm_service/scheduler/loadbalance_policy/round_robin.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ namespace xllm_service {

class RoundRobin final : public LoadBalancePolicy {
public:
RoundRobin(std::shared_ptr<InstanceMgr> instance_mgr,
std::shared_ptr<GlobalKVCacheMgr> global_kvcache_mgr)
: LoadBalancePolicy(instance_mgr, global_kvcache_mgr) {};
RoundRobin(std::shared_ptr<InstanceMgr> instance_mgr)
: LoadBalancePolicy(instance_mgr) {};

virtual ~RoundRobin() = default;

bool select_instances_pair(std::shared_ptr<Request> request) override;

protected:
private:
DISALLOW_COPY_AND_ASSIGN(RoundRobin);
};

Expand Down
2 changes: 2 additions & 0 deletions xllm_service/scheduler/managers/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ cc_library(
instance_mgr.cpp
global_kvcache_mgr.cpp
DEPS
:chat_template
:common
:etcd_client
:request
absl::random_random
absl::strings
glog::glog
Expand Down
115 changes: 106 additions & 9 deletions xllm_service/scheduler/managers/instance_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ void InstanceMgr::init() {
for (auto& it : ETCD_KEYS_PREFIX_MAP) {
etcd_client_->get_prefix(it.second, &instances_);
}
// create ttft predictor for each instance
for (auto& pair : instances_) {
ttft_predictors_.insert_or_assign(
pair.first, TtftPredictor(pair.second.ttft_profiling_data));
// create ttft predictor and request metrics for each instance
{
std::lock_guard<std::mutex> ttft_predictor_lock(ttft_predictor_mutex_);
std::lock_guard<std::mutex> request_metrics_lock(request_metrics_mutex_);
for (auto& pair : instances_) {
ttft_predictors_.insert_or_assign(
pair.first, TtftPredictor(pair.second.ttft_profiling_data));
request_metrics_.insert_or_assign(pair.first, RequestMetrics());
}
}
LOG(INFO) << "Load instance info from etcd:" << instances_.size();
std::vector<std::string> channel_creat_fail_insts;
Expand Down Expand Up @@ -99,7 +104,13 @@ void InstanceMgr::init() {
}
for (auto& name : channel_creat_fail_insts) {
instances_.erase(name);
ttft_predictors_.erase(name);
{
std::lock_guard<std::mutex> ttft_predictor_lock(ttft_predictor_mutex_);
std::lock_guard<std::mutex> request_metrics_lock(
request_metrics_mutex_);
ttft_predictors_.erase(name);
request_metrics_.erase(name);
}
}
}
{
Expand Down Expand Up @@ -340,9 +351,18 @@ void InstanceMgr::update_instance_metainfo(const etcd::Response& response,
continue;
}

// create ttft predictor for instance
ttft_predictors_.emplace(
iter.first, TtftPredictor(iter.second.ttft_profiling_data));
{
std::lock_guard<std::mutex> ttft_predictor_lock(
ttft_predictor_mutex_);
std::lock_guard<std::mutex> request_metrics_lock(
request_metrics_mutex_);
// create ttft predictor for instance
ttft_predictors_.emplace(
iter.first, TtftPredictor(iter.second.ttft_profiling_data));

// create request metrics for instance
request_metrics_.emplace(iter.first, RequestMetrics());
}

instances_.insert(std::make_pair(iter.first, std::move(iter.second)));

Expand Down Expand Up @@ -395,8 +415,15 @@ void InstanceMgr::update_instance_metainfo(const etcd::Response& response,
}

instances_.erase(iter);
ttft_predictors_.erase(iter);
cached_channels_.erase(iter);
{
std::lock_guard<std::mutex> ttft_predictor_lock(
ttft_predictor_mutex_);
std::lock_guard<std::mutex> request_metrics_lock(
request_metrics_mutex_);
ttft_predictors_.erase(iter);
request_metrics_.erase(iter);
}
{
std::lock_guard<std::mutex> lock(update_mutex_);
updated_metrics_.erase(iter);
Expand Down Expand Up @@ -450,4 +477,74 @@ void InstanceMgr::update_load_metrics(const etcd::Response& response,
});
}

void InstanceMgr::update_latency_metrics(
const std::string& instance_name,
const proto::LatencyMetrics& latency_metrics) {
std::lock_guard<std::mutex> lock(latency_metrics_mutex_);

latency_metrics_.insert_or_assign(
instance_name,
LatencyMetrics(latency_metrics.recent_max_ttft(),
latency_metrics.recent_max_tbt()));
}

void InstanceMgr::update_request_metrics(std::shared_ptr<Request> request,
RequestAction action) {
std::lock_guard<std::mutex> lock(request_metrics_mutex_);

auto prefill_it = request_metrics_.find(request->routing.prefill_name);
if (prefill_it == request_metrics_.end()) {
LOG(ERROR) << "Failed to find instance request metrics, instance name : "
<< request->routing.prefill_name;
return;
}

auto decode_it = request_metrics_.find(request->routing.decode_name);
if (decode_it == request_metrics_.end()) {
LOG(ERROR) << "Failed to find instance request metrics, instance name : "
<< request->routing.decode_name;
return;
}

int64_t token_length = request->token_ids.size();
switch (action) {
case RequestAction::SCHEDULE:
// update the request metrics for prefill and decode instances when
// request is scheduled
prefill_it->second.prefill_request_num += 1;
prefill_it->second.prefill_token_num += token_length;
prefill_it->second.estimated_prefill_time += request->estimated_ttft;

decode_it->second.decode_request_num += 1;
decode_it->second.decode_token_num += token_length;
break;
case RequestAction::FINISH_PREFILL:
// only update the request metrics for prefill instance when request
// finishes the prefill phase
prefill_it->second.prefill_request_num -= 1;
prefill_it->second.prefill_token_num -= token_length;
prefill_it->second.estimated_prefill_time -= request->estimated_ttft;
break;
case RequestAction::FINISH_DECODE:
// update the request metrics for decode instance when request finishes
// the decode phase
decode_it->second.decode_request_num -= 1;
decode_it->second.decode_token_num -= token_length;
break;
case RequestAction::CANCEL:
// update the request metrics for prefill and decode instances when
// request is cancelled
prefill_it->second.prefill_request_num -= 1;
prefill_it->second.prefill_token_num -= token_length;
prefill_it->second.estimated_prefill_time -= request->estimated_ttft;

decode_it->second.decode_request_num -= 1;
decode_it->second.decode_token_num -= token_length;
break;
default:
LOG(ERROR) << "Unknown RequestAction: " << static_cast<int32_t>(action);
break;
}
}

} // namespace xllm_service
Loading