diff --git a/fastdeploy/config.py b/fastdeploy/config.py
index 4ebfd4584b5..5fe8588282e 100644
--- a/fastdeploy/config.py
+++ b/fastdeploy/config.py
@@ -378,9 +378,9 @@ def reset_config_value(key, value):
if not hasattr(self, key.lower()):
if os.getenv(key, None):
value = eval(os.getenv(key))
- logger.info(f"Get parameter `{key}` = {value} from environment.")
+ logger.info("Get parameter `%s` = %s from environment.", key, value)
else:
- logger.info(f"Parameter `{key}` will use default value {value}.")
+ logger.info("Parameter `%s` will use default value %s.", key, value)
setattr(self, key.lower(), value)
reset_config_value("COMPRESSION_RATIO", 1.0)
@@ -597,10 +597,10 @@ def print(self):
"""
Print all configuration information.
"""
- logger.info("Model Configuration Information :")
+ logger.debug("Model Configuration Information:")
for k, v in self.__dict__.items():
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
class ParallelConfig:
@@ -679,7 +679,7 @@ def __init__(
and self.expert_parallel_size > 1
and self.tensor_parallel_size > 1
)
- logger.info(f"use_sequence_parallel_moe: {self.use_sequence_parallel_moe}")
+ logger.debug("use_sequence_parallel_moe: %s", self.use_sequence_parallel_moe)
def set_communicate_group(self):
# different tp group id
@@ -700,7 +700,15 @@ def set_communicate_group(self):
self.ep_group = dist.new_group(range(self.expert_parallel_size))
dist.collective._set_custom_gid(None)
logger.info(
- f"data_parallel_size: {self.data_parallel_size}, tensor_parallel_size: {self.tensor_parallel_size}, expert_parallel_size: {self.expert_parallel_size}, data_parallel_rank: {self.data_parallel_rank}, tensor_parallel_rank: {self.tensor_parallel_rank}, expert_parallel_rank: {self.expert_parallel_rank}, tp_group: {self.tp_group}."
+ "data_parallel_size: %d, tensor_parallel_size: %d, expert_parallel_size: %d, "
+ "data_parallel_rank: %d, tensor_parallel_rank: %d, expert_parallel_rank: %d, tp_group: %s",
+ self.data_parallel_size,
+ self.tensor_parallel_size,
+ self.expert_parallel_size,
+ self.data_parallel_rank,
+ self.tensor_parallel_rank,
+ self.expert_parallel_rank,
+ self.tp_group,
)
def print(self):
@@ -708,10 +716,10 @@ def print(self):
print all config
"""
- logger.info("Parallel Configuration Information :")
+ logger.debug("Parallel Configuration Information:")
for k, v in self.__dict__.items():
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
class SpeculativeConfig:
@@ -836,10 +844,10 @@ def print(self):
print all config
"""
- logger.info("Speculative Decoding Configuration Information :")
+ logger.debug("Speculative Decoding Configuration Information:")
for k, v in self.__dict__.items():
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
def check_legality_parameters(
self,
@@ -1339,10 +1347,10 @@ def print(self):
"""
Print all configuration information.
"""
- logger.info("EPLB Configuration Information :")
+ logger.debug("EPLB Configuration Information:")
for k, v in self.__dict__.items():
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
class CacheConfig:
@@ -1490,7 +1498,7 @@ def postprocess(self, num_total_tokens, number_of_tasks):
block_num = (length + self.block_size - 1 + self.dec_token_num) // self.block_size
self.total_block_num = block_num * number_of_tasks
self.prefill_kvcache_block_num = self.total_block_num
- logger.info(f"Doing profile, the total_block_num:{self.total_block_num}")
+ logger.info("Doing profile, the total_block_num: %d", self.total_block_num)
def reset(self, num_gpu_blocks):
"""
@@ -1516,10 +1524,10 @@ def print(self):
print all config
"""
- logger.info("Cache Configuration Information :")
+ logger.debug("Cache Configuration Information:")
for k, v in self.__dict__.items():
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
class RouterConfig:
@@ -1586,19 +1594,19 @@ def _load_from_version_file(self, file_path: str = None):
elif line.startswith("CXX compiler version:"):
self.compiler_version = line.split(":")[1].strip()
except FileNotFoundError:
- logger.info(f"Warning: Version file not found at {file_path}")
+ logger.warning("Version file not found at %s", file_path)
except Exception as e:
- logger.info(f"Warning: Could not read version file - {e!s}")
+ logger.warning("Could not read version file: %s", e)
def print(self):
"""
print all config
"""
- logger.info("Fasedeploy Commit Information :")
+ logger.debug("FastDeploy Commit Information:")
for k, v in self.__dict__.items():
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
class StructuredOutputsConfig:
@@ -2121,11 +2129,20 @@ def print(self):
"""
print all config
"""
- logger.info("=================== Configuration Information ===============")
+ logger.info(
+ "Configuration: model=%s, tp=%d, max_batch=%d, max_seq_len=%d, dtype=%s, device=%s",
+ self.model_config.model,
+ self.parallel_config.tensor_parallel_size,
+ self.scheduler_config.max_num_seqs,
+ self.model_config.max_model_len,
+ self.model_config.dtype,
+ self.parallel_config.device_ids,
+ )
+ logger.debug("=================== Configuration Information ===============")
for k, v in self.__dict__.items():
if k == "generation_config" and v is not None:
for gck, gcv in v.to_dict().items():
- logger.info("{:<20}:{:<6}{}".format(gck, "", gcv))
+ logger.debug("{:<20}:{:<6}{}".format(gck, "", gcv))
elif (
k == "cache_config"
or k == "model_config"
@@ -2136,8 +2153,8 @@ def print(self):
if v is not None:
v.print()
else:
- logger.info("{:<20}:{:<6}{}".format(k, "", v))
- logger.info("=============================================================")
+ logger.debug("{:<20}:{:<6}{}".format(k, "", v))
+ logger.debug("=============================================================")
def init_cache_info(self):
"""
@@ -2171,7 +2188,7 @@ def init_cache_info(self):
"transfer_protocol": transfer_protocol,
"tp_size": self.parallel_config.tensor_parallel_size,
}
- logger.info(f"register_info: {self.register_info}")
+ logger.debug("register_info: %s", self.register_info)
def read_from_config(self):
"""
@@ -2182,7 +2199,7 @@ def reset_value(cls, value_name, key):
if hasattr(cls, key):
value = getattr(cls, key)
setattr(cls, value_name, value)
- logger.info(f"Reset parameter {value_name} = {value} from configuration.")
+ logger.info("Reset parameter %s = %s from configuration.", value_name, value)
reset_value(self.cache_config, "block_size", "infer_model_block_size")
reset_value(
diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py
index 5723a239378..be084972dfb 100644
--- a/fastdeploy/engine/common_engine.py
+++ b/fastdeploy/engine/common_engine.py
@@ -72,7 +72,7 @@
try:
TokenProcessor = load_token_processor_plugins()
- llm_logger.info(f"TokenProcessor plugin {TokenProcessor} loaded")
+ llm_logger.debug("TokenProcessor plugin %s loaded", TokenProcessor)
except:
from fastdeploy.output.token_processor import TokenProcessor
@@ -108,7 +108,7 @@ def __init__(self, cfg, start_queue=True, use_async_llm=False):
for rank in range(tp_size):
engine_worker_queue_port = self.cfg.parallel_config.local_engine_worker_queue_port
name = f"ctrl_w2e_rank{rank+tp_size*dp_index}_{engine_worker_queue_port}"
- self.llm_logger.info(f"Init Worker Control Output Queue: {name}(consumer)")
+ self.llm_logger.debug("Init Worker Control Output Queue: %s (consumer)", name)
self._ctrl_worker_output_queues.append(FMQ().queue(name, "consumer"))
self.scheduler = cfg.scheduler_config.scheduler()
@@ -136,7 +136,7 @@ def __init__(self, cfg, start_queue=True, use_async_llm=False):
self.start_worker_queue_service(start_queue)
os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.parallel_config.local_engine_worker_queue_port)
- self.llm_logger.info(f"INFERENCE_MSG_QUEUE_ID: {str(self.cfg.parallel_config.local_engine_worker_queue_port)}")
+ self.llm_logger.debug("INFERENCE_MSG_QUEUE_ID: %s", self.cfg.parallel_config.local_engine_worker_queue_port)
self.split_connector = SplitwiseConnector(cfg, self.engine_worker_queue, self.resource_manager)
self.token_processor = TokenProcessor(
@@ -283,7 +283,7 @@ def create_data_processor(self):
def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进程感知是否有新Task需要处理
current_suffix = self.cfg.parallel_config.local_engine_worker_queue_port
- self.llm_logger.info(f"current_suffix: {current_suffix}")
+ self.llm_logger.debug("current_suffix: %s", current_suffix)
exist_task_signal_data = np.zeros([1], dtype=np.int32)
self.exist_task_signal = IPCSignal(
name="exist_task_signal",
@@ -392,7 +392,7 @@ def start_worker_queue_service(self, start_queue):
if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0":
if start_queue:
- self.llm_logger.info(f"Starting engine worker queue server service at {address}")
+ self.llm_logger.debug("Starting engine worker queue server service at %s", address)
self.engine_worker_queue_server = EngineWorkerQueue(
address=address,
is_server=True,
@@ -410,8 +410,8 @@ def start_worker_queue_service(self, start_queue):
)
if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
- self.llm_logger.info(
- f"Starting engine cache queue server service at {self.cfg.cache_config.local_cache_queue_port}"
+ self.llm_logger.debug(
+ "Starting engine cache queue server service at %s", self.cfg.cache_config.local_cache_queue_port
)
self.cache_task_queue = EngineCacheQueue(
address=(self.cfg.master_ip, self.cfg.cache_config.local_cache_queue_port),
@@ -507,7 +507,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
self.split_connector.send_cache_info_to_prefill(tasks)
if not is_decode:
- self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}")
+ self.llm_logger.debug("Tasks are sent to engine, req_ids=%s", req_ids)
for task in tasks:
if not getattr(task, "has_been_preempted_before", False):
task.metrics.inference_start_time = time.time()
@@ -965,8 +965,8 @@ def _fetch_request():
)
if self.cfg.scheduler_config.splitwise_role == "prefill":
self.resource_manager.add_request_in_p(tasks)
- self.llm_logger.info(
- f"P add requests into running queue: {[task.request_id for task in tasks]}"
+ self.llm_logger.debug(
+ "P add requests into running queue: %s", [task.request_id for task in tasks]
)
else:
for task in tasks:
@@ -1164,18 +1164,18 @@ def _insert_zmq_task_to_scheduler(self):
status_value = data.get("status", None)
if status_value is not None and status_value == RequestStatus.ABORT.value:
req_id = data["request_id"]
- self.llm_logger.info(f"Receive abort request, req_id: {req_id}")
+ self.llm_logger.info("Receive abort request, req_id: %s", req_id)
self.resource_manager.abort_req_ids_set.add(req_id)
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if req_id in self.resource_manager.requests:
req = self.resource_manager.requests[req_id]
task = self.resource_manager._prepare_preempt_task(req)
self.engine_worker_queue.put_tasks(([task], self.resource_manager.real_bsz))
- self.llm_logger.info(f"put abort task in engine worker queue, req_id: {req_id}")
+ self.llm_logger.debug("put abort task in engine worker queue, req_id: %s", req_id)
else:
self.scheduler._recycle(req_id)
- self.llm_logger.info(
- f"req_id:{req_id} has not been allocated any resources, recycled it in scheduler"
+ self.llm_logger.debug(
+ "req_id:%s has not been allocated any resources, recycled it in scheduler", req_id
)
self.resource_manager.abort_req_ids_set.remove(req_id)
continue
@@ -1259,7 +1259,7 @@ def run_control_method(self, control_req: ControlRequest):
request_id = control_req.request_id
try:
- self.llm_logger.info(f"START run control method {request_id}: {method}")
+ self.llm_logger.debug("START run control method %s: %s", request_id, method)
handler_name = f"_control_{method}"
handler = getattr(self, handler_name, None)
@@ -1270,7 +1270,7 @@ def run_control_method(self, control_req: ControlRequest):
return
result = handler(control_req)
- self.llm_logger.info(f"SUCCESS run control method {method}.")
+ self.llm_logger.debug("SUCCESS run control method %s", method)
succ_result = ControlResponse(request_id, 200, "Success", result)
self.send_response_server.send_response(request_id, [succ_result])
@@ -1319,7 +1319,7 @@ def _control_pause(self, control_request: ControlRequest):
raise Exception(error_msg)
running_reqs = self.resource_manager.preempted_all()
if len(running_reqs) > 0:
- self.llm_logger.info(f"Total {len(running_reqs)} requests need to be aborted.")
+ self.llm_logger.info("Total %d requests need to be aborted.", len(running_reqs))
self.resource_manager.get_real_bsz()
self.engine_worker_queue.put_tasks((running_reqs, self.resource_manager.real_bsz))
self.resource_manager.wait_worker_inflight_requests_finish(timeout=60)
@@ -1329,7 +1329,7 @@ def _control_pause(self, control_request: ControlRequest):
# abort inflight requests to user
inflight_requests = self.scheduler.get_inflight_requests()
- self.llm_logger.info(f"Start Abort Inflight Requests, total {len(inflight_requests)} waiting requests")
+ self.llm_logger.info("Start Abort Inflight Requests, total %d waiting requests", len(inflight_requests))
for req in inflight_requests:
self._send_error_response(req.request_id, "Request is aborted since LLM Engine is paused.")
self.scheduler.reset()
@@ -1366,7 +1366,7 @@ def _control_is_paused(self, control_request: ControlRequest) -> bool:
Returns:
dict: Dictionary containing pause status information, {'is_paused': bool}
"""
- self.llm_logger.info(f"LLM Engine request generation is paused: {self.is_paused}")
+ self.llm_logger.debug("LLM Engine request generation is paused: %s", self.is_paused)
with self._pause_cond:
return {"is_paused": self.is_paused}
@@ -1418,12 +1418,12 @@ async def _wait_all_control_responses(self, request_id: str, timeout: int):
raise Exception("Worker Update Weights Timeouted after 600s")
response: ControlResponse = msg.payload
if response.request_id != request_id:
- self.llm_logger.info(f"ignore old control response from worker:{output_queue.name} {response}")
+ self.llm_logger.debug("ignore old control response from worker:%s %s", output_queue.name, response)
continue
if response.error_code != 200:
- self.llm_logger.info(f"Call Worker Failed: {output_queue.name} {response.error_message}")
+ self.llm_logger.debug("Call Worker Failed: %s %s", output_queue.name, response.error_message)
raise Exception(f"Call Worker error: {response.error_message}")
- self.llm_logger.info(f"Call Worker Succeed: {output_queue.name} {response.result}")
+ self.llm_logger.debug("Call Worker Succeed: %s %s", output_queue.name, response.result)
responses.append(response.result)
return responses
@@ -1579,7 +1579,7 @@ def _process_allocate_resource_requests():
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if self.resource_manager.preallocate_resource_in_d(task):
task.metrics.decode_preallocate_req_time = time.time()
- self.llm_logger.info(f"Resource available, processing task {task.request_id}")
+ self.llm_logger.debug("Resource available, processing task %s", task.request_id)
self.split_connector.send_cache_info_to_prefill([task])
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
processed_indices.append(idx)
@@ -1652,7 +1652,7 @@ def _process_prefilled_requests():
if envs.FD_ENABLE_INTERNAL_ADAPTER: # first token sent by D instance
self.scheduler.put_results([req_output])
self.resource_manager.add_prefilled_request(req_output)
- self.llm_logger.info(f"D has successfully added prefilled request, {request_id}")
+ self.llm_logger.debug("D has successfully added prefilled request, %s", request_id)
def decode_loop():
while self.running:
@@ -1771,7 +1771,7 @@ def _exit_sub_services(self):
self.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear()
self.resource_manager.cache_manager.cache_ready_signal.clear()
for p in self.cache_manager_processes:
- self.llm_logger.info(f"Killing cache manager process {p.pid}")
+ self.llm_logger.info("Killing cache manager process %s", p.pid)
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
@@ -1801,7 +1801,7 @@ def _exit_sub_services(self):
# Clean up other services
if hasattr(self, "dp_processed"):
for p in self.dp_processed:
- self.llm_logger.info(f"Waiting for worker {p.pid} to exit")
+ self.llm_logger.info("Waiting for worker %s to exit", p.pid)
p.join()
for p in self.dp_engine_worker_queue_server:
p.cleanup()
@@ -1970,14 +1970,14 @@ def _start_worker_service(self):
think_start_id = self.data_processor.tokenizer.get_vocab().get("", -1)
if think_start_id >= 0:
- self.llm_logger.info(f"Get think_start_id {think_start_id} from vocab.")
+ self.llm_logger.debug("think_start_id=%s", think_start_id)
else:
- self.llm_logger.info("No token found in vocabulary, the model can not do reasoning.")
+ self.llm_logger.debug("No token found in vocabulary, the model can not do reasoning.")
think_end_id = self.data_processor.tokenizer.get_vocab().get("", -1)
if think_end_id >= 0:
- self.llm_logger.info(f"Get think_end_id {think_end_id} from vocab.")
+ self.llm_logger.debug("think_end_id=%s", think_end_id)
else:
- self.llm_logger.info("No token found in vocabulary, the model can not do reasoning.")
+ self.llm_logger.debug("No token found in vocabulary, the model can not do reasoning.")
image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1)
line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1)
if line_break_id < 0:
@@ -1996,7 +1996,7 @@ def _start_worker_service(self):
else:
line_break_id = int(line_break_ids)
if line_break_id >= 0:
- self.llm_logger.info(f"Get line_break_id {line_break_id} from tokenizer.")
+ self.llm_logger.debug("line_break_id=%s", line_break_id)
ports = ",".join(map(str, self.cfg.parallel_config.engine_worker_queue_port))
ips = None
@@ -2083,7 +2083,7 @@ def _start_worker_service(self):
if self.cfg.nnode > 1:
pd_cmd = pd_cmd + f" --ips {ips} --nnodes {len(self.cfg.ips)}"
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
- self.llm_logger.info(f"Launch worker service command: {pd_cmd}")
+ self.llm_logger.info("Launch worker service command: %s", pd_cmd)
p = subprocess.Popen(
pd_cmd,
stdout=subprocess.PIPE,
@@ -2154,7 +2154,7 @@ def launch_components(self):
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
- self.llm_logger.info(f"dp start queue service {address}")
+ self.llm_logger.debug("dp start queue service %s", address)
self.dp_engine_worker_queue_server.append(
EngineWorkerQueue(
address=address,
diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py
index 790365482cb..9213047620f 100644
--- a/fastdeploy/engine/engine.py
+++ b/fastdeploy/engine/engine.py
@@ -192,12 +192,14 @@ def check_worker_initialize_status_func(res: dict):
envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[0]
if envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None:
envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0]
- llm_logger.info(
- f"envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT:{envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT},envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT:{envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT}"
+ llm_logger.debug(
+ "ZMQ ports: recv=%s, send=%s",
+ envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT,
+ envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT,
)
if api_server_pid is not None:
- llm_logger.info(f"Start zmq server, api_server_pid: {api_server_pid}")
+ llm_logger.debug("Start zmq server, api_server_pid: %s", api_server_pid)
self.engine.start_zmq_service(api_server_pid)
# Worker launched
@@ -206,7 +208,7 @@ def check_worker_initialize_status_func(res: dict):
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
return False
- console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.")
+ console_logger.info("Worker processes launched in %.1f seconds.", time.time() - start_time)
# Print blocks number & max running requests to console
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
@@ -269,7 +271,7 @@ def add_requests(self, task, sampling_params=None, **kwargs):
task.update(asdict(sampling_params))
request = Request.from_dict(task)
request.metrics.scheduler_recv_req_time = time.time()
- llm_logger.info(f"Receive request {request}")
+ llm_logger.debug("Receive request: %s", request)
if sampling_params is not None:
if sampling_params.temperature is not None and abs(sampling_params.temperature) < 1e-06:
sampling_params.temperature = 1e-06
@@ -339,7 +341,7 @@ def add_requests(self, task, sampling_params=None, **kwargs):
request.metrics.preprocess_end_time = time.time()
request.metrics.scheduler_recv_req_time = time.time()
self.engine.scheduler.put_requests([request])
- llm_logger.info(f"Cache task with request_id ({request.get('request_id')})")
+ llm_logger.debug("Cache task with request_id (%s)", request.get("request_id"))
llm_logger.debug(f"cache task: {request}")
def _worker_processes_ready(self):
@@ -425,7 +427,7 @@ def _exit_sub_services(self):
if hasattr(self.engine.resource_manager.cache_manager, "cache_ready_signal"):
self.engine.resource_manager.cache_manager.cache_ready_signal.clear()
for p in self.cache_manager_processes:
- llm_logger.info(f"Killing cache manager process {p.pid}")
+ llm_logger.debug("Killing cache manager process %s", p.pid)
try:
pgid = os.getpgid(p.pid)
os.killpg(pgid, signal.SIGTERM)
@@ -451,7 +453,7 @@ def _exit_sub_services(self):
if hasattr(self, "dp_processed"):
for p in self.dp_processed:
- console_logger.info(f"Waiting for worker {p.pid} to exit")
+ console_logger.debug("Waiting for worker %s to exit", p.pid)
p.join()
for p in self.dp_engine_worker_queue_server:
p.cleanup()
@@ -528,14 +530,14 @@ def _start_worker_service(self):
think_start_id = self.data_processor.tokenizer.get_vocab().get("", -1)
if think_start_id >= 0:
- llm_logger.info(f"Get think_start_id {think_start_id} from vocab.")
+ llm_logger.debug("think_start_id=%s", think_start_id)
else:
- llm_logger.info("No token found in vocabulary, the model can not do reasoning.")
+ llm_logger.debug("No token found in vocabulary, the model can not do reasoning.")
think_end_id = self.data_processor.tokenizer.get_vocab().get("", -1)
if think_end_id >= 0:
- llm_logger.info(f"Get think_end_id {think_end_id} from vocab.")
+ llm_logger.debug("think_end_id=%s", think_end_id)
else:
- llm_logger.info("No token found in vocabulary, the model can not do reasoning.")
+ llm_logger.debug("No token found in vocabulary, the model can not do reasoning.")
image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1)
line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1)
if line_break_id < 0:
@@ -554,7 +556,7 @@ def _start_worker_service(self):
else:
line_break_id = int(line_break_ids)
if line_break_id >= 0:
- llm_logger.info(f"Get line_break_id {line_break_id} from tokenizer.")
+ llm_logger.debug("line_break_id=%s", line_break_id)
try:
think_truncate_prompt_ids = self.data_processor.tokenizer.convert_tokens_to_ids(
self.data_processor.tokenizer.tokenize(self.data_processor.tokenizer.think_truncate_prompt)
@@ -563,7 +565,7 @@ def _start_worker_service(self):
think_truncate_prompt_ids = self.data_processor.tokenizer.convert_tokens_to_ids(
self.data_processor.tokenizer.tokenize(envs.FD_LIMIT_THINKING_CONTENT_TRUNCATE_STR)
)
- llm_logger.info(f"Get think_truncate_prompt_ids {think_truncate_prompt_ids} from tokenizer.")
+ llm_logger.debug("think_truncate_prompt_ids=%s", think_truncate_prompt_ids)
ports = ",".join(map(str, self.cfg.parallel_config.engine_worker_queue_port))
ips = None
@@ -660,7 +662,7 @@ def _start_worker_service(self):
if self.cfg.nnode > 1:
pd_cmd = pd_cmd + f" --ips {ips} --nnodes {len(self.cfg.ips)}"
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
- llm_logger.info(f"Launch worker service command: {pd_cmd}")
+ llm_logger.info("Launch worker service command: %s", pd_cmd)
p = subprocess.Popen(
pd_cmd,
stdout=subprocess.PIPE,
@@ -704,7 +706,7 @@ def generate(self, prompts, stream):
Yields:
dict: The generated response.
"""
- llm_logger.info(f"Starting generation for prompt: {prompts}")
+ llm_logger.debug("Starting generation for prompt: %s", prompts)
try:
req_id = self._format_and_add_data(prompts)
except Exception as e:
@@ -799,7 +801,7 @@ def launch_components(self):
else:
address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock"
- llm_logger.info(f"dp start queue service {address}")
+ llm_logger.info("dp start queue service %s", address)
self.dp_engine_worker_queue_server.append(
EngineWorkerQueue(
address=address,
diff --git a/fastdeploy/entrypoints/api_server.py b/fastdeploy/entrypoints/api_server.py
index 4f4d7f2250c..cf90a4c73e5 100644
--- a/fastdeploy/entrypoints/api_server.py
+++ b/fastdeploy/entrypoints/api_server.py
@@ -61,7 +61,7 @@ async def generate(request: dict):
"""
generate stream api
"""
- api_server_logger.info(f"Receive request: {request}")
+ api_server_logger.debug("Receive request: %s", request)
stream = request.get("stream", 0)
if not stream:
@@ -99,8 +99,8 @@ def launch_api_server(args) -> None:
if not is_port_available(args.host, args.port):
raise Exception(f"The parameter `port`:{args.port} is already in use.")
- api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}")
- api_server_logger.info(f"args: {args.__dict__}")
+ api_server_logger.info("Launching FastDeploy API server on port %s", args.port)
+ api_server_logger.debug("Server args: %s", args.__dict__)
if not init_app(args):
api_server_logger.error("API Server launch failed.")
diff --git a/fastdeploy/entrypoints/chat_utils.py b/fastdeploy/entrypoints/chat_utils.py
index 08abc8ed391..a486bcb8e00 100644
--- a/fastdeploy/entrypoints/chat_utils.py
+++ b/fastdeploy/entrypoints/chat_utils.py
@@ -122,7 +122,7 @@ def http_get_with_retry(self, url, max_retries=3, retry_delay=1, backoff_factor=
if retry_cnt >= max_retries:
api_server_logger.error(f"HTTP GET failed: {e}. Max retries reached")
raise
- api_server_logger.info(f"HTTP GET failed: {e}. Start retry {retry_cnt}")
+ api_server_logger.info("HTTP GET failed: %s. Start retry %s", e, retry_cnt)
time.sleep(delay)
delay *= backoff_factor
diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py
index 53c71351820..40ba6da9b62 100644
--- a/fastdeploy/entrypoints/engine_client.py
+++ b/fastdeploy/entrypoints/engine_client.py
@@ -312,7 +312,7 @@ async def add_requests(self, task):
)
else:
request_id = task.get("request_id", "unknown")
- obj_logger.info(f"\n{'='*60} OBJGRAPH DEBUG [request_id={request_id}] {'='*60}")
+ obj_logger.debug("\n%s OBJGRAPH DEBUG [request_id=%s] %s", "=" * 60, request_id, "=" * 60)
# 打印内存占用
if not _has_psutil:
obj_logger.warning(
@@ -321,18 +321,18 @@ async def add_requests(self, task):
else:
process = psutil.Process()
rss_memory = process.memory_info().rss / 1024**3
- obj_logger.info(f"Process Memory (RSS): {rss_memory:.2f} GB")
+ obj_logger.debug("Process Memory (RSS): %.2f GB", rss_memory)
obj_logger.info("Object growth statistics:")
growth_data = objgraph.growth(limit=20)
for item in growth_data:
if len(item) == 3:
obj_type, current_count, growth = item
- obj_logger.info(f" {obj_type:30s} {current_count:8d} +{growth}")
+ obj_logger.debug(" %-30s %8d +%s", obj_type, current_count, growth)
elif len(item) == 2:
obj_type, count = item
- obj_logger.info(f" {obj_type:30s} +{count}")
+ obj_logger.debug(" %-30s +%s", obj_type, count)
else:
- obj_logger.info(f" {item}")
+ obj_logger.debug(" %s", item)
task["metrics"]["preprocess_start_time"] = time.time()
request_id = task.get("request_id").split("_")[0]
@@ -359,7 +359,7 @@ async def add_requests(self, task):
min_tokens = task.get("min_tokens", 1)
if "messages" in task:
task["messages"] = None
- api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}")
+ api_server_logger.debug("task['max_tokens']:%s", task["max_tokens"])
main_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
main_process_metrics.prompt_tokens_total.inc(input_ids_len)
main_process_metrics.request_prompt_tokens.observe(input_ids_len)
@@ -592,7 +592,7 @@ def check_health(self, time_interval_threashold=30):
return True, ""
async def run_control_method(self, request: ControlRequest):
- api_server_logger.info(f"Start Run Control Method: {request}")
+ api_server_logger.debug("Start Run Control Method: %s", request)
self.zmq_client.send_json(request.to_dict())
request_id = request.request_id
dealer, response_queue = await self.connection_manager.get_connection(request_id)
@@ -601,7 +601,7 @@ async def run_control_method(self, request: ControlRequest):
# todo: support user specified timeout. default 600s is enough for most control cases
response = await asyncio.wait_for(response_queue.get(), timeout=600)
response = ControlResponse.from_dict(response[0])
- api_server_logger.info(f"End Run Control Method: {response}")
+ api_server_logger.debug("End Run Control Method: %s", response)
return response
except asyncio.TimeoutError:
error_response = ControlResponse(request_id, 500, "Timeout waiting for control method response")
@@ -827,7 +827,7 @@ async def rearrange_experts(self, request_dict: dict):
return content, status_code
action = request_dict.get("action", "")
- api_server_logger.info(f"redundant_expert: rearrange_experts recv request, action {action}")
+ api_server_logger.debug("redundant_expert: rearrange_experts recv request, action %s", action)
if action == "":
# action: start rearrange experts
# params: {'user': 'xxx', 'passwd': 'xxx', 'ips': ['10.54.99.77:8000', '10.54.99.77:8300']}
@@ -995,7 +995,7 @@ async def check_redundant(self, request_dict: dict):
async def abort(self, request_id, n=1) -> None:
if envs.FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE:
- api_server_logger.info(f"abort request_id:{request_id}")
+ api_server_logger.info("abort request_id:%s", request_id)
if n <= 0:
api_server_logger.warning("Abort function called with non-positive n: %d. No requests aborted.", n)
return
diff --git a/fastdeploy/entrypoints/llm.py b/fastdeploy/entrypoints/llm.py
index 101c8fbb101..7fa61fd47bc 100644
--- a/fastdeploy/entrypoints/llm.py
+++ b/fastdeploy/entrypoints/llm.py
@@ -408,7 +408,7 @@ def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: i
list[dict[int, Logprob]]: One dict per request, mapping token ID to Logprob.
"""
try:
- llm_logger.info(f"filter logprobs, topk_logprobs: {topk_logprobs}")
+ llm_logger.debug("filter logprobs, topk_logprobs: %s", topk_logprobs)
if not logprobs_lists.logprob_token_ids:
llm_logger.warning("Empty logprob_token_ids in LogprobsLists")
return None
diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py
index ca63e586cc6..87e8cfc5c91 100644
--- a/fastdeploy/entrypoints/openai/api_server.py
+++ b/fastdeploy/entrypoints/openai/api_server.py
@@ -89,7 +89,7 @@
parser = make_arg_parser(FlexibleArgumentParser())
args = parser.parse_args()
-console_logger.info(f"Number of api-server workers: {args.workers}.")
+console_logger.info("Number of API server workers: %d", args.workers)
args.model = retrive_model_from_server(args.model, args.revision)
chat_template = load_chat_template(args.chat_template, args.model)
@@ -124,7 +124,7 @@ def load_engine():
if llm_engine is not None:
return llm_engine
- api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}, port: {args.port}")
+ api_server_logger.info("FastDeploy LLM API server starting (pid=%s, port=%s)", os.getpid(), args.port)
engine_args = EngineArgs.from_cli_args(args)
if envs.FD_ENABLE_ASYNC_LLM:
engine = AsyncLLM.from_engine_args(engine_args, pid=args.port)
@@ -152,11 +152,11 @@ def load_data_service():
global llm_engine
if llm_engine is not None:
return llm_engine
- api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}, port: {args.port}")
+ api_server_logger.info("FastDeploy LLM API server starting (pid=%s, port=%s)", os.getpid(), args.port)
engine_args = EngineArgs.from_cli_args(args)
config = engine_args.create_engine_config()
- api_server_logger.info(f"local_data_parallel_id: {config.parallel_config}")
- api_server_logger.info(f"local_data_parallel_id: {config.parallel_config.local_data_parallel_id}")
+ api_server_logger.debug("parallel_config: %s", config.parallel_config)
+ api_server_logger.debug("local_data_parallel_id: %s", config.parallel_config.local_data_parallel_id)
expert_service = ExpertService(config, config.parallel_config.local_data_parallel_id)
if not expert_service.start(args.port, config.parallel_config.local_data_parallel_id):
api_server_logger.error("Failed to initialize FastDeploy LLM expert service, service exit now!")
@@ -188,7 +188,7 @@ async def lifespan(app: FastAPI):
if args.tokenizer is None:
args.tokenizer = args.model
pid = args.port
- api_server_logger.info(f"{pid}")
+ api_server_logger.debug("PID: %s", pid)
if args.served_model_name is not None:
served_model_names = args.served_model_name
@@ -290,7 +290,7 @@ async def lifespan(app: FastAPI):
from prometheus_client import multiprocess
multiprocess.mark_process_dead(os.getpid())
- api_server_logger.info(f"Closing metrics client pid: {pid}")
+ api_server_logger.debug("Closing metrics client pid: %s", pid)
except Exception as e:
api_server_logger.warning(f"exit error: {e}, {str(traceback.format_exc())}")
@@ -315,7 +315,7 @@ async def connection_manager():
await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001)
yield
except asyncio.TimeoutError:
- api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}")
+ api_server_logger.debug("Reach max request concurrency, semaphore status: %s", connection_semaphore.status())
raise HTTPException(
status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}"
)
@@ -520,7 +520,7 @@ async def create_completion(request: CompletionRequest, req: Request):
"""
Create a completion for the provided prompt and parameters.
"""
- api_server_logger.info(f"Completion Received request: {request.model_dump_json()}")
+ api_server_logger.debug("Completion received request: %s", request.model_dump_json())
if envs.TRACES_ENABLE:
if req.headers:
headers = dict(req.headers)
@@ -658,8 +658,8 @@ def launch_api_server() -> None:
if not is_port_available(args.host, args.port):
raise Exception(f"The parameter `port`:{args.port} is already in use.")
- api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}")
- api_server_logger.info(f"args: {args.__dict__}")
+ api_server_logger.info("Launching FastDeploy API server on port %s", args.port)
+ api_server_logger.debug("Server args: %s", args.__dict__)
# fd_start_span("FD_START")
# set control_socket_disable=True to avoid conflicts when running multiple instances
@@ -851,11 +851,11 @@ def main():
api_server_logger.info("FastDeploy LLM engine initialized!\n")
if args.metrics_port is not None and args.metrics_port != args.port:
launch_metrics_server()
- console_logger.info(f"Launching metrics service at http://{args.host}:{args.metrics_port}/metrics")
+ console_logger.info("Launching metrics service at http://%s:%s/metrics", args.host, args.metrics_port)
else:
- console_logger.info(f"Launching metrics service at http://{args.host}:{args.port}/metrics")
- console_logger.info(f"Launching chat completion service at http://{args.host}:{args.port}/v1/chat/completions")
- console_logger.info(f"Launching completion service at http://{args.host}:{args.port}/v1/completions")
+ console_logger.info("Launching metrics service at http://%s:%s/metrics", args.host, args.port)
+ console_logger.info("Launching chat completion service at http://%s:%s/v1/chat/completions", args.host, args.port)
+ console_logger.info("Launching completion service at http://%s:%s/v1/completions", args.host, args.port)
launch_worker_monitor()
launch_controller_server()
diff --git a/fastdeploy/entrypoints/openai/multi_api_server.py b/fastdeploy/entrypoints/openai/multi_api_server.py
index 32b3f75ae88..b55480219e8 100644
--- a/fastdeploy/entrypoints/openai/multi_api_server.py
+++ b/fastdeploy/entrypoints/openai/multi_api_server.py
@@ -50,7 +50,7 @@ def start_servers(
else:
controller_ports = [-1] * server_count
- logger.info(f"Starting servers on ports: {ports} with args: {server_args} and metrics ports: {metrics_ports}")
+ logger.info("Starting servers on ports: %s with args: %s and metrics ports: %s", ports, server_args, metrics_ports)
port_idx = {}
for i in range(len(server_args)):
if server_args[i] == "--engine-worker-queue-port":
@@ -66,7 +66,7 @@ def start_servers(
port = find_free_ports(num_ports=server_count)
server_args += ["--engine-worker-queue-port", ",".join(map(str, port))]
port_idx["engine_worker_queue_port"] = len(server_args) - 1
- logger.info(f"No --engine-worker-queue-port specified, using random ports: {port}")
+ logger.debug("No --engine-worker-queue-port specified, using random ports: %s", port)
engine_worker_queue_port = server_args[port_idx["engine_worker_queue_port"]].split(",")
if not check_param(engine_worker_queue_port, server_count):
return
@@ -75,7 +75,7 @@ def start_servers(
port = find_free_ports(num_ports=server_count)
server_args += ["--cache-queue-port", ",".join(map(str, port))]
port_idx["cache_queue_port"] = len(server_args) - 1
- logger.info(f"No --cache-queue-port specified, using random ports: {port}")
+ logger.debug("No --cache-queue-port specified, using random ports: %s", port)
cache_queue_port = server_args[port_idx["cache_queue_port"]].split(",")
if not check_param(cache_queue_port, server_count):
return
@@ -84,7 +84,7 @@ def start_servers(
port = find_free_ports(num_ports=server_count)
server_args += ["--pd-comm-port", ",".join(map(str, port))]
port_idx["pd_comm_port"] = len(server_args) - 1
- logger.info(f"No --pd-comm-port specified, using random ports: {port}")
+ logger.debug("No --pd-comm-port specified, using random ports: %s", port)
pd_comm_port = server_args[port_idx["pd_comm_port"]].split(",")
if not check_param(pd_comm_port, server_count):
return
@@ -93,12 +93,12 @@ def start_servers(
port = find_free_ports(num_ports=device_count)
server_args += ["--rdma-comm-ports", ",".join(map(str, port))]
port_idx["rdma_comm_ports"] = len(server_args) - 1
- logger.info(f"No --rdma-comm-ports specified, using random ports: {port}")
+ logger.debug("No --rdma-comm-ports specified, using random ports: %s", port)
rdma_comm_ports = server_args[port_idx["rdma_comm_ports"]].split(",")
if not check_param(rdma_comm_ports, device_count):
return
- logger.info(f"Modified server_args: {server_args}")
+ logger.debug("Modified server_args: %s", server_args)
processes = []
for i in range(server_count):
port = int(ports[i])
@@ -125,16 +125,16 @@ def start_servers(
# 启动子进程
proc = subprocess.Popen(cmd, env=env)
processes.append(proc)
- logger.info(f"Starting servers #{i+1} (PID: {proc.pid}) port: {port} | command: {' '.join(cmd)}")
+ logger.info("Starting server #%d (PID: %s) port: %s", i + 1, proc.pid, port)
return processes
def check_param(ports, num_servers):
- logger.info(f"check param {ports}, {num_servers}")
+ logger.debug("check param %s, %s", ports, num_servers)
assert len(ports) == num_servers, "Number of ports must match num-servers"
for port in ports:
- logger.info(f"check port {port}")
+ logger.debug("check port %s", port)
if not is_port_available("0.0.0.0", int(port)):
raise RuntimeError(f"Port {port} is not available.")
return True
@@ -149,7 +149,7 @@ def main():
parser.add_argument("--args", nargs=argparse.REMAINDER, help="remaining arguments are passed to api_server.py")
args = parser.parse_args()
- logger.info(f"Launching MultiAPIServer with command: {' '.join(sys.argv)}")
+ logger.info("Launching MultiAPIServer with command: %s", " ".join(sys.argv))
device_count = 0
if current_platform.is_cuda():
diff --git a/fastdeploy/entrypoints/openai/run_batch.py b/fastdeploy/entrypoints/openai/run_batch.py
index 67766ff67e8..2daeb8bb659 100644
--- a/fastdeploy/entrypoints/openai/run_batch.py
+++ b/fastdeploy/entrypoints/openai/run_batch.py
@@ -114,7 +114,7 @@ def init_engine(args: argparse.Namespace):
if llm_engine is not None:
return llm_engine
- api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}")
+ api_server_logger.info("FastDeploy LLM API server starting... %s", os.getpid())
engine_args = EngineArgs.from_cli_args(args)
engine = LLMEngine.from_engine_args(engine_args)
if not engine.start(api_server_pid=os.getpid()):
@@ -144,7 +144,7 @@ def completed(self):
if self._total > 0:
log_interval = min(100, max(self._total // 10, 1))
if self._completed - self._last_log_count >= log_interval:
- console_logger.info(f"Progress: {self._completed}/{self._total} requests completed")
+ console_logger.info("Progress: %s/%s requests completed", self._completed, self._total)
self._last_log_count = self._completed
def pbar(self) -> tqdm:
@@ -398,7 +398,7 @@ async def setup_engine_and_handlers(args: Namespace) -> Tuple[EngineClient, Open
args.tokenizer = args.model
pid = determine_process_id()
- console_logger.info(f"Process ID: {pid}")
+ console_logger.info("Process ID: %s", pid)
model_paths = create_model_paths(args)
chat_template = load_chat_template(args.chat_template, args.model)
@@ -429,7 +429,7 @@ async def run_batch(
max_concurrency = (concurrency + workers - 1) // workers
semaphore = asyncio.Semaphore(max_concurrency)
- console_logger.info(f"concurrency: {concurrency}, workers: {workers}, max_concurrency: {max_concurrency}")
+ console_logger.info("concurrency: %s, workers: %s, max_concurrency: %s", concurrency, workers, max_concurrency)
tracker = BatchProgressTracker()
console_logger.info("Reading batch from %s...", args.input_file)
@@ -474,7 +474,7 @@ async def run_batch(
success_count = sum(1 for r in responses if r.error is None)
error_count = len(responses) - success_count
- console_logger.info(f"Batch processing completed: {success_count} success, {error_count} errors")
+ console_logger.info("Batch processing completed: %s success, %s errors", success_count, error_count)
await write_file(args.output_file, responses, args.output_tmp_dir)
console_logger.info("Results written to output file")
@@ -485,7 +485,7 @@ async def main(args: argparse.Namespace):
try:
if args.workers is None:
args.workers = max(min(int(args.max_num_seqs // 32), 8), 1)
- console_logger.info(f"Workers: {args.workers}")
+ console_logger.info("Workers: %s", args.workers)
args.model = retrive_model_from_server(args.model, args.revision)
if args.tool_parser_plugin:
diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py
index f56d572615c..f58f2389d35 100644
--- a/fastdeploy/entrypoints/openai/serving_chat.py
+++ b/fastdeploy/entrypoints/openai/serving_chat.py
@@ -98,7 +98,7 @@ def __init__(
else:
self.master_ip = "0.0.0.0"
self.is_master_ip = True
- api_server_logger.info(f"master ip: {self.master_ip}")
+ api_server_logger.info("master ip: %s", self.master_ip)
def _check_master(self):
return self.engine_client.is_master or self.is_master_ip
@@ -129,7 +129,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
await self.engine_client.semaphore.acquire()
else:
await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time)
- api_server_logger.info(f"current {self.engine_client.semaphore.status()}")
+ api_server_logger.debug("current %s", self.engine_client.semaphore.status())
if request.request_id is not None:
request_id = request.request_id
@@ -141,7 +141,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
request_id = f"chatcmpl-{uuid.uuid4()}"
tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy")
del request.trace_context
- api_server_logger.info(f"create chat completion request: {request_id}")
+ api_server_logger.debug("create chat completion request: %s", request_id)
prompt_tokens = None
max_tokens = None
try:
@@ -252,7 +252,7 @@ async def chat_completion_stream_generator(
choices=[],
model=model_name,
)
- api_server_logger.info(f"create chat completion request: {request_id}")
+ api_server_logger.debug("create chat completion request: %s", request_id)
try:
dealer, response_queue = await self.engine_client.connection_manager.get_connection(
@@ -370,7 +370,7 @@ async def chat_completion_stream_generator(
completion_tokens_details=CompletionTokenUsageInfo(reasoning_tokens=0),
)
yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n"
- api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}")
+ api_server_logger.debug("Chat Streaming response send_idx 0: %s", chunk.model_dump_json())
first_iteration = False
output = res["outputs"]
@@ -489,7 +489,7 @@ async def chat_completion_stream_generator(
chunk.choices = choices
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
if res["finished"]:
- api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}")
+ api_server_logger.debug("Chat Streaming response last send: %s", chunk.model_dump_json())
choices = []
if include_usage:
@@ -528,7 +528,7 @@ async def chat_completion_stream_generator(
tracing.trace_req_finish(request_id)
await self.engine_client.connection_manager.cleanup_request(request_id)
self.engine_client.semaphore.release()
- api_server_logger.info(f"release {request_id} {self.engine_client.semaphore.status()}")
+ api_server_logger.debug("release %s %s", request_id, self.engine_client.semaphore.status())
yield "data: [DONE]\n\n"
async def chat_completion_full_generator(
@@ -695,7 +695,7 @@ async def chat_completion_full_generator(
tracing.trace_req_finish(request_id)
await self.engine_client.connection_manager.cleanup_request(request_id)
self.engine_client.semaphore.release()
- api_server_logger.info(f"release {self.engine_client.semaphore.status()}")
+ api_server_logger.debug("release %s", self.engine_client.semaphore.status())
num_prompt_tokens = len(prompt_token_ids)
num_generated_tokens = sum(previous_num_tokens)
@@ -722,7 +722,7 @@ async def chat_completion_full_generator(
choices=choices,
usage=usage,
)
- api_server_logger.info(f"Chat response: {res.model_dump_json()}")
+ api_server_logger.debug("Chat response: %s", res.model_dump_json())
return res
async def _create_chat_completion_choice(
diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py
index 3dafb270905..6a6b9ab8465 100644
--- a/fastdeploy/entrypoints/openai/serving_completion.py
+++ b/fastdeploy/entrypoints/openai/serving_completion.py
@@ -77,7 +77,7 @@ def __init__(self, engine_client, models, pid, ips, max_waiting_time):
self.master_ip = "0.0.0.0"
self.is_master_ip = True
self._is_process_response_dict_async = None
- api_server_logger.info(f"master ip: {self.master_ip}")
+ api_server_logger.info("master ip: %s", self.master_ip)
def _check_master(self):
return self.engine_client.is_master or self.is_master_ip
@@ -110,7 +110,7 @@ async def create_completion(self, request: CompletionRequest):
request_id = f"cmpl-{request.user}-{uuid.uuid4()}"
else:
request_id = f"cmpl-{uuid.uuid4()}"
- api_server_logger.info(f"Initialize request {request_id}: {request}")
+ api_server_logger.debug("Initialize request %s: %s", request_id, request)
tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy")
del request.trace_context
request_prompt_ids = None
@@ -155,7 +155,7 @@ async def create_completion(self, request: CompletionRequest):
request_prompts = request_prompt_ids
num_choices = len(request_prompts) * (1 if request.n is None else request.n)
- api_server_logger.info(f"Start preprocessing request: req_id={request_id}), num_choices={num_choices}")
+ api_server_logger.debug("Start preprocessing request: req_id=%s, num_choices=%s", request_id, num_choices)
prompt_batched_token_ids = []
prompt_tokens_list = []
max_tokens_list = []
@@ -370,7 +370,7 @@ async def completion_full_generator(
prompt_tokens_list=prompt_tokens_list,
max_tokens_list=max_tokens_list,
)
- api_server_logger.info(f"Completion response: {res.model_dump_json()}")
+ api_server_logger.debug("Completion response: %s", res.model_dump_json())
return res
except Exception as e:
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
@@ -642,7 +642,7 @@ async def completion_stream_generator(
metrics=res["metrics"] if request.collect_metrics else None,
)
yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n"
- api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}")
+ api_server_logger.debug("Completion Streaming response last send: %s", chunk.model_dump_json())
except asyncio.CancelledError as e:
await self.engine_client.abort(f"{request_id}_0", num_choices)
diff --git a/fastdeploy/entrypoints/openai/serving_embedding.py b/fastdeploy/entrypoints/openai/serving_embedding.py
index ec3223b3576..d619f01eb37 100644
--- a/fastdeploy/entrypoints/openai/serving_embedding.py
+++ b/fastdeploy/entrypoints/openai/serving_embedding.py
@@ -155,7 +155,7 @@ async def create_embedding(self, request: EmbeddingRequest):
@override
def _build_response(self, ctx: ServeContext, request_output: dict):
"""Generate final embedding response"""
- api_server_logger.info(f"[{ctx.request_id}] Embedding RequestOutput received:{request_output}")
+ api_server_logger.debug("[%s] Embedding RequestOutput received: %s", ctx.request_id, request_output)
base = PoolingRequestOutput.from_dict(request_output)
embedding_res = EmbeddingRequestOutput.from_base(base)
diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py
index a357d3fc9df..6371ad8c6a3 100644
--- a/fastdeploy/entrypoints/openai/serving_engine.py
+++ b/fastdeploy/entrypoints/openai/serving_engine.py
@@ -76,7 +76,7 @@ def __init__(self, models, cfg, pid, ips, max_waiting_time):
else:
self.master_ip = "0.0.0.0"
self.__semaphore = None
- api_server_logger.info(f"master ip: {self.master_ip}")
+ api_server_logger.debug("master ip: %s", self.master_ip)
def _get_semaphore(self) -> StatefulSemaphore:
if self.__semaphore is None:
@@ -101,7 +101,7 @@ def _check_supported_model(self, model_name: str) -> tuple[bool, str]:
async def _acquire_semaphore(self, request_id: str) -> bool:
"""Acquire engine client semaphore with timeout"""
try:
- api_server_logger.info(f"Acquire request:{request_id} status:{self._get_semaphore().status()}")
+ api_server_logger.debug("Acquire request:%s status:%s", request_id, self._get_semaphore().status())
if self.max_waiting_time < 0:
await self._get_semaphore().acquire()
else:
@@ -116,7 +116,7 @@ async def _acquire_semaphore(self, request_id: str) -> bool:
def _release_semaphore(self, request_id: str) -> None:
"""Release engine client semaphore"""
self._get_semaphore().release()
- api_server_logger.info(f"Release request:{request_id} status:{self._get_semaphore().status()}")
+ api_server_logger.debug("Release request:%s status:%s", request_id, self._get_semaphore().status())
def _create_error_response(
self,
@@ -192,7 +192,7 @@ async def _pipeline(self, ctx: ServeContext) -> Union[Any, ErrorResponse]:
request_id = self._generate_request_id(request)
ctx.request_id = request_id
- api_server_logger.info(f"Initialize request {request_id}: {request}")
+ api_server_logger.debug("Initialize request %s: %s", request_id, request)
# Step 2: Semaphore acquisition
if not await self._acquire_semaphore(request_id):
@@ -251,7 +251,7 @@ async def _preprocess(self, ctx: ServeContext):
request_dicts = self._request_to_batch_dicts(ctx)
ctx.preprocess_requests = request_dicts
for request_dict in request_dicts:
- api_server_logger.info(f"batch add request_id: {request_dict['request_id']}, request: {request_dict}")
+ api_server_logger.debug("batch add request_id: %s, request: %s", request_dict["request_id"], request_dict)
await self.engine_client.format_and_add_data(request_dict)
def _process_chat_template_kwargs(self, request_dict):
@@ -301,7 +301,7 @@ def _get_semaphore(self):
async def _acquire_semaphore(self, request_id: str) -> bool:
"""Acquire engine client semaphore with timeout"""
try:
- api_server_logger.info(f"Acquire request:{request_id} status:{self._get_semaphore().status()}")
+ api_server_logger.debug("Acquire request:%s status:%s", request_id, self._get_semaphore().status())
if self.max_waiting_time < 0:
await self._get_semaphore().acquire()
else:
@@ -317,7 +317,7 @@ async def _acquire_semaphore(self, request_id: str) -> bool:
def _release_semaphore(self, request_id: str) -> None:
"""Release engine client semaphore"""
self._get_semaphore().release()
- api_server_logger.info(f"Release request:{request_id} status:{self._get_semaphore().status()}")
+ api_server_logger.debug("Release request:%s status:%s", request_id, self._get_semaphore().status())
@override
def _check_master(self) -> bool:
diff --git a/fastdeploy/entrypoints/openai/serving_reward.py b/fastdeploy/entrypoints/openai/serving_reward.py
index cbde62deea5..3220d550b8c 100644
--- a/fastdeploy/entrypoints/openai/serving_reward.py
+++ b/fastdeploy/entrypoints/openai/serving_reward.py
@@ -107,7 +107,7 @@ async def create_reward(self, request: ChatRewardRequest):
@override
def _build_response(self, ctx: ServeContext, request_output: dict):
"""Generate final reward response"""
- api_server_logger.info(f"[{ctx.request_id}] Reward RequestOutput received:{request_output}")
+ api_server_logger.debug("[%s] Reward RequestOutput received:%s", ctx.request_id, request_output)
base = PoolingRequestOutput.from_dict(request_output)
reward_res = RewardRequestOutput.from_base(base)
diff --git a/fastdeploy/entrypoints/openai/utils.py b/fastdeploy/entrypoints/openai/utils.py
index 80701ac6d5e..ba8fa9508f3 100644
--- a/fastdeploy/entrypoints/openai/utils.py
+++ b/fastdeploy/entrypoints/openai/utils.py
@@ -97,7 +97,7 @@ async def initialize(self):
self.running = True
for index in range(self.max_connections):
await self._add_connection(index)
- api_server_logger.info(f"Started {self.max_connections} connections, pid {self.pid}")
+ api_server_logger.info("Started %s connections, pid %s", self.max_connections, self.pid)
async def _add_connection(self, index):
"""create a new connection and start listening task"""
diff --git a/fastdeploy/entrypoints/openai/v1/serving_base.py b/fastdeploy/entrypoints/openai/v1/serving_base.py
index ba9ba9dfc75..52ee9a048e0 100644
--- a/fastdeploy/entrypoints/openai/v1/serving_base.py
+++ b/fastdeploy/entrypoints/openai/v1/serving_base.py
@@ -76,7 +76,7 @@ def __init__(
self.master_ip = "0.0.0.0"
self.is_master_ip = True
self.eoi_token_id = 101032
- api_server_logger.info(f"master ip: {self.master_ip}")
+ api_server_logger.info("master ip: %s", self.master_ip)
@override
def _check_master(self) -> bool:
diff --git a/fastdeploy/entrypoints/openai/v1/serving_chat.py b/fastdeploy/entrypoints/openai/v1/serving_chat.py
index ed622c2d4dd..ceab3dd6fc1 100644
--- a/fastdeploy/entrypoints/openai/v1/serving_chat.py
+++ b/fastdeploy/entrypoints/openai/v1/serving_chat.py
@@ -302,7 +302,7 @@ async def _build_stream_response(
max_tokens = request.max_completion_tokens or request.max_tokens
choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index]
choice.finish_reason = self._calc_finish_reason(request_output, max_tokens, choice_completion_tokens)
- api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}")
+ api_server_logger.debug("Chat Streaming response last send: %s", chunk.model_dump_json())
yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n"
if request_output.finished and response_ctx.remain_choices == 0:
@@ -339,7 +339,7 @@ async def _build_full_response(
res = ChatCompletionResponse(
id=ctx.request_id, model=request.model, choices=choices, created=ctx.created_time, usage=response_ctx.usage
)
- api_server_logger.info(f"Chat response: {res.model_dump_json()}")
+ api_server_logger.debug("Chat response: %s", res.model_dump_json())
return res
async def _create_chat_completion_choice(
diff --git a/fastdeploy/entrypoints/openai/v1/serving_completion.py b/fastdeploy/entrypoints/openai/v1/serving_completion.py
index c8e82eed8a8..c101a66b5f5 100644
--- a/fastdeploy/entrypoints/openai/v1/serving_completion.py
+++ b/fastdeploy/entrypoints/openai/v1/serving_completion.py
@@ -271,9 +271,9 @@ async def _build_stream_response(
choice.finish_reason = self._calc_finish_reason(
request_output, request.max_tokens, choice_completion_tokens
)
- api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}")
+ api_server_logger.debug("Completion Streaming response last send: %s", chunk.model_dump_json())
if send_idx == 0 and not request.return_token_ids:
- api_server_logger.info(f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}")
+ api_server_logger.debug("Completion Streaming response send_idx 0: %s", chunk.model_dump_json())
yield f"data: {chunk.model_dump_json()}\n\n"
if request_output.finished and response_ctx.remain_choices == 0:
if include_usage:
@@ -321,7 +321,7 @@ async def _build_full_response(
choices=choices,
usage=response_ctx.usage,
)
- api_server_logger.info(f"Completion response: {res.model_dump_json()}")
+ api_server_logger.debug("Completion response: %s", res.model_dump_json())
return res
except Exception as e:
api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True)
diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py
index 1b2dc7af99c..c457e4e3e31 100644
--- a/fastdeploy/envs.py
+++ b/fastdeploy/envs.py
@@ -36,6 +36,9 @@ def _validate_split_kv_size(value: int) -> int:
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
# Whether to use debug mode, can set 0 or 1
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
+ # Log level for FastDeploy loggers. Supports: DEBUG, INFO, WARNING, ERROR.
+ # Defaults to DEBUG if FD_DEBUG=1, otherwise INFO.
+ "FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", "DEBUG" if environment_variables["FD_DEBUG"]() else "INFO"),
# Number of days to keep fastdeploy logs.
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
# Model download source, can set "AISTUDIO", "MODELSCOPE" or "HUGGINGFACE".
diff --git a/fastdeploy/logger/__init__.py b/fastdeploy/logger/__init__.py
index e69de29bb2d..f17d2c3212b 100644
--- a/fastdeploy/logger/__init__.py
+++ b/fastdeploy/logger/__init__.py
@@ -0,0 +1,46 @@
+# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+FastDeploy logging module.
+
+Provides a unified logging interface for all FastDeploy components.
+
+Usage::
+
+ from fastdeploy.logger import get_logger
+
+ logger = get_logger(__name__)
+ logger.info("Hello from FastDeploy")
+"""
+
+from fastdeploy.logger.logger import FastDeployLogger
+
+
+def get_logger(name=None):
+ """Get a FastDeploy logger with the given name.
+
+ The name will be prefixed with 'fastdeploy.' automatically
+ if it is not already in the fastdeploy namespace.
+
+ Args:
+ name: Logger name. If None, returns the root fastdeploy logger.
+
+ Returns:
+ logging.Logger instance
+ """
+ return FastDeployLogger().get_logger(name)
+
+
+__all__ = ["get_logger", "FastDeployLogger"]
diff --git a/fastdeploy/logger/logger.py b/fastdeploy/logger/logger.py
index 63d431ce81b..160348b8c89 100644
--- a/fastdeploy/logger/logger.py
+++ b/fastdeploy/logger/logger.py
@@ -105,17 +105,17 @@ def get_trace_logger(self, name, file_name, without_formater=False, print_to_con
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
- is_debug = int(envs.FD_DEBUG)
+ # Use FD_LOG_LEVEL for unified log level control (falls back to FD_DEBUG)
+ log_level_str = getattr(envs, "FD_LOG_LEVEL", "INFO")
+ log_level = getattr(logging, log_level_str.upper(), logging.INFO)
+
# logger = logging.getLogger(name)
# Use namespace for isolation to avoid logger overwrite and confusion issues, for compatibility with original interface
legacy_name = f"legacy.{name}"
logger = logging.getLogger(legacy_name)
# Set log level
- if is_debug:
- logger.setLevel(level=logging.DEBUG)
- else:
- logger.setLevel(level=logging.INFO)
+ logger.setLevel(level=log_level)
# Set formatter
formatter = CustomFormatter(
@@ -170,17 +170,17 @@ def _get_legacy_logger(self, name, file_name, without_formater=False, print_to_c
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
- is_debug = envs.FD_DEBUG
+ # Use FD_LOG_LEVEL for unified log level control (falls back to FD_DEBUG)
+ log_level_str = getattr(envs, "FD_LOG_LEVEL", "INFO")
+ log_level = getattr(logging, log_level_str.upper(), logging.INFO)
+
# logger = logging.getLogger(name)
# Use namespace for isolation to avoid logger overwrite and confusion issues, for compatibility with original interface
legacy_name = f"legacy.{name}"
logger = logging.getLogger(legacy_name)
# Set log level
- if is_debug:
- logger.setLevel(level=logging.DEBUG)
- else:
- logger.setLevel(level=logging.INFO)
+ logger.setLevel(level=log_level)
# Set formatter - use standard format for both file and console (no color)
formatter = logging.Formatter(
diff --git a/fastdeploy/logger/setup_logging.py b/fastdeploy/logger/setup_logging.py
index 2dd24b379df..3ba66a5eb4b 100644
--- a/fastdeploy/logger/setup_logging.py
+++ b/fastdeploy/logger/setup_logging.py
@@ -47,8 +47,9 @@ def setup_logging(log_dir=None, config_file=None):
Path(log_dir).mkdir(parents=True, exist_ok=True)
# 从环境变量获取日志级别和备份数量
- is_debug = int(getattr(envs, "FD_DEBUG", 0))
- FASTDEPLOY_LOGGING_LEVEL = "DEBUG" if is_debug else "INFO"
+ log_level = getattr(envs, "FD_LOG_LEVEL", "INFO")
+ FASTDEPLOY_LOGGING_LEVEL = log_level
+ is_debug = FASTDEPLOY_LOGGING_LEVEL == "DEBUG"
backup_count = int(getattr(envs, "FD_LOG_BACKUP_COUNT", 7))
# 定义日志输出格式
@@ -122,7 +123,7 @@ def setup_logging(log_dir=None, config_file=None):
# 默认日志记录器,全局共享
"fastdeploy": {
"level": "DEBUG",
- "handlers": ["error_file", "default_file", "error_archive", "default_archive"],
+ "handlers": ["console", "error_file", "default_file", "error_archive", "default_archive"],
"propagate": False,
}
},
diff --git a/fastdeploy/model_executor/graph_optimization/utils.py b/fastdeploy/model_executor/graph_optimization/utils.py
index 2b5241e5aed..1807b6cceeb 100644
--- a/fastdeploy/model_executor/graph_optimization/utils.py
+++ b/fastdeploy/model_executor/graph_optimization/utils.py
@@ -15,9 +15,12 @@
"""
import contextlib
+import logging
from dataclasses import dataclass
import paddle
+
+logger = logging.getLogger(__name__)
import pynvml
from fastdeploy.platforms import current_platform
@@ -66,15 +69,18 @@ def _print_memory_info(
debug_title: str = "",
):
"""Print debug info"""
- print(
- f"\n{debug_title}:",
- f"\n\tDevice Total memory: {self.gpu_memory_info.total}",
- f"\n\tDevice Used memory: {self.gpu_memory_info.used}",
- f"\n\tDevice Free memory: {self.gpu_memory_info.free}",
- f"\n\tPaddle max memory Reserved: {self.paddle_memory_info.max_reserved}",
- f"\n\tPaddle max memory Allocated: {self.paddle_memory_info.max_allocated}",
- f"\n\tPaddle memory Reserved: {self.paddle_memory_info.current_reserved}",
- f"\n\tPaddle memory Allocated: {self.paddle_memory_info.current_reserved}",
+ logger.debug(
+ "%s:\n\tDevice Total memory: %s\n\tDevice Used memory: %s\n\tDevice Free memory: %s"
+ "\n\tPaddle max memory Reserved: %s\n\tPaddle max memory Allocated: %s"
+ "\n\tPaddle memory Reserved: %s\n\tPaddle memory Allocated: %s",
+ debug_title,
+ self.gpu_memory_info.total,
+ self.gpu_memory_info.used,
+ self.gpu_memory_info.free,
+ self.paddle_memory_info.max_reserved,
+ self.paddle_memory_info.max_allocated,
+ self.paddle_memory_info.current_reserved,
+ self.paddle_memory_info.current_reserved,
)
def get_gpu_memory_info(self):
diff --git a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py
index 08ed2d8c061..a3d7fc465cc 100644
--- a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py
+++ b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py
@@ -16,8 +16,12 @@
from __future__ import annotations
+import logging
+
import paddle
+logger = logging.getLogger(__name__)
+
paddle.enable_compat(scope={"flash_mla"}) # Enable torch proxy before importing flash_mla
import math
import os
@@ -294,13 +298,13 @@ def __init__(
is_paddle_supported = any(num >= 90 for num in paddle.version.cuda_archs())
if is_current_sm_supported and is_paddle_supported:
self.flash_attn_func = flash_attention_v3_varlen
- print("The current platform supports Flash Attention V3.")
+ logger.info("The current platform supports Flash Attention V3.")
self.flash_attn_kwargs = {"softmax_scale": self.attn_softmax_scale}
else:
self.flash_attn_func = flash_attn_unpadded
self.flash_attn_kwargs = {"scale": self.attn_softmax_scale, "training": False}
- print(
- "The current platform does not support Flash Attention V3, so Flash Attention V2 will be used instead."
+ logger.info(
+ "The current platform does not support Flash Attention V3, using Flash Attention V2 instead."
)
def init_attention_metadata(self, forward_meta: ForwardMeta):
diff --git a/fastdeploy/model_executor/models/adapters.py b/fastdeploy/model_executor/models/adapters.py
index 306bbcd5169..b090f37fcd2 100644
--- a/fastdeploy/model_executor/models/adapters.py
+++ b/fastdeploy/model_executor/models/adapters.py
@@ -14,10 +14,13 @@
# limitations under the License.
"""
+import logging
from collections.abc import Iterable
from typing import TypeVar
import paddle
+
+logger = logging.getLogger(__name__)
import paddle.nn as nn
from fastdeploy.config import ModelConfig
@@ -72,7 +75,7 @@ def _load_dense_weights(linear: nn.Linear, folder: str, model_config: "ModelConf
bias_loader(linear.bias, state_dict[bias_key].astype(paddle.float32))
return True
except Exception as e:
- print(f"Failed to load :{e}")
+ logger.warning("Failed to load adapter weight: %s", e)
return False
return False
diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py
index f78572ccc8c..efe26d0259b 100644
--- a/fastdeploy/model_executor/models/model_base.py
+++ b/fastdeploy/model_executor/models/model_base.py
@@ -26,8 +26,11 @@
iter_architecture_defaults,
try_match_architecture_defaults,
)
+from fastdeploy.logger import get_logger
from fastdeploy.model_executor.models.interfaces_base import get_default_pooling_type
+logger = get_logger(__name__)
+
class ModelCategory(IntFlag):
TEXT_GENERATION = auto()
@@ -106,7 +109,7 @@ def _try_inspect_model_cls(
try:
return model.inspect_model_cls()
except Exception:
- print("Error in inspecting model architecture '%s'", model_arch)
+ logger.error("Error in inspecting model architecture '%s'", model_arch)
return None
@@ -139,7 +142,7 @@ def _try_load_model_cls(self, architecture: str) -> Optional[Type[nn.Layer]]:
try:
return self.models[architecture].load_model_cls()
except Exception as e:
- print(f"Failed to load model {architecture}: {e}")
+ logger.error("Failed to load model %s: %s", architecture, e)
return None
@lru_cache(maxsize=128)
@@ -149,7 +152,7 @@ def _try_inspect_model_cls(self, model_arch: str) -> Optional[ModelInfo]:
try:
return self.models[model_arch].inspect_model_cls()
except Exception as e:
- print(f"Failed to inspect model {model_arch}: {e}")
+ logger.error("Failed to inspect model %s: %s", model_arch, e)
return None
def _normalize_arch(self, architecture: str, model_config: ModelConfig) -> str:
@@ -271,7 +274,7 @@ def inspect_model_cls(
if fallback_backend is not None:
model_info = self._try_inspect_model_cls(fallback_backend)
if model_info is not None:
- print(f"Using PaddleFormers backend as fallback for {model_info.architecture}")
+ logger.info("Using PaddleFormers backend as fallback for %s", model_info.architecture)
return (model_info, model_info.architecture)
return self._raise_for_unsupported(architectures)
diff --git a/fastdeploy/model_executor/ops/gpu/__init__.py b/fastdeploy/model_executor/ops/gpu/__init__.py
index a4faadb17a1..5b324dcae3b 100644
--- a/fastdeploy/model_executor/ops/gpu/__init__.py
+++ b/fastdeploy/model_executor/ops/gpu/__init__.py
@@ -13,10 +13,13 @@
# limitations under the License.
"""fastdeploy gpu ops"""
+import logging
import sys
from fastdeploy.import_ops import import_custom_ops
+logger = logging.getLogger(__name__)
+
PACKAGE = "fastdeploy.model_executor.ops.gpu"
@@ -29,7 +32,7 @@ def decide_module():
# to support all hardware platforms (NVIDIA, ILUVATAR, HPU, etc.)
prop = paddle.device.get_device_properties()
sm_version = prop.major * 10 + prop.minor
- print(f"current sm_version={sm_version}")
+ logger.debug("current sm_version=%s", sm_version)
curdir = os.path.dirname(os.path.abspath(__file__))
sm_version_path = os.path.join(curdir, f"fastdeploy_ops_{sm_version}")
@@ -42,7 +45,7 @@ def decide_module():
try:
module_path = decide_module()
except Exception as e:
- print(f"decide_module error, load custom_ops from .fastdeploy_ops: {e}")
+ logger.warning("decide_module error, load custom_ops from .fastdeploy_ops: %s", e)
pass
import_custom_ops(PACKAGE, module_path, globals())
diff --git a/fastdeploy/model_executor/ops/triton_ops/triton_utils.py b/fastdeploy/model_executor/ops/triton_ops/triton_utils.py
index f92e8d9c94f..524395a84b8 100644
--- a/fastdeploy/model_executor/ops/triton_ops/triton_utils.py
+++ b/fastdeploy/model_executor/ops/triton_ops/triton_utils.py
@@ -15,6 +15,7 @@
"""
import inspect
+import logging
import os
import re
import sys
@@ -25,6 +26,8 @@
from fastdeploy import envs
+logger = logging.getLogger(__name__)
+
compile_file = triton.__path__[0] + "/tools/compile.py"
link_file = triton.__path__[0] + "/tools/link.py"
python_path = sys.executable
@@ -660,7 +663,7 @@ def decorator(*args, **kwargs):
generated_dir = envs.FD_TRITON_KERNEL_CACHE_DIR
if generated_dir is None:
generated_dir = f"/tmp/triton_cache/rank{tp_rank}"
- print("the kernel cache dir is:", generated_dir)
+ logger.debug("the kernel cache dir is: %s", generated_dir)
assert generated_dir is not None, (
"TRITON_KERNEL_CACHE_DIR is None, please set it such as "
"export TRITON_KERNEL_CACHE_DIR=/tmp/triton_cache "
@@ -702,7 +705,7 @@ def decorator(*args, **kwargs):
so_path = find_so_path(generated_dir, python_package_name)
if so_path is None:
- print("== we do not find so_path, we need to compile it")
+ logger.debug("== we do not find so_path, we need to compile it")
with open(paddle_custom_op_file_path, "w") as f:
f.write(
SubstituteTemplate(
@@ -754,7 +757,7 @@ def decorator(*args, **kwargs):
codegen_command = aot_template.format(
**config,
)
- print(codegen_command)
+ logger.debug("codegen command: %s", codegen_command)
codegen_commands.append(codegen_command)
multi_process_do(codegen_commands)
@@ -771,7 +774,7 @@ def decorator(*args, **kwargs):
if op_name not in OpProtoHolder.instance().op_proto_map.keys():
so_path = find_so_path(generated_dir, python_package_name)
- print("== we find so_path: ", so_path)
+ logger.debug("== we find so_path: %s", so_path)
assert so_path is not None
paddle.utils.cpp_extension.load_op_meta_info_and_register_op(so_path)
diff --git a/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py b/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py
index 98589a4c376..7b26eaddc93 100644
--- a/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py
+++ b/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py
@@ -16,6 +16,7 @@
import importlib
import inspect
+import logging
import os
import re
import sys
@@ -23,6 +24,8 @@
import paddle
import triton
+logger = logging.getLogger(__name__)
+
from .triton_utils import (
SubstituteTemplate,
build_package,
@@ -203,7 +206,7 @@ def decorator(*args, **kwargs):
tp_rank = paddle.distributed.get_rank()
generated_dir = os.getenv("TRITON_KERNEL_CACHE_DIR", f"/tmp/triton_cache/rank{tp_rank}")
- print("the kernel cache dir is:", generated_dir)
+ logger.debug("the kernel cache dir is: %s", generated_dir)
generated_dir = f"{generated_dir}/{op_name}"
os.makedirs(generated_dir, exist_ok=True)
@@ -240,7 +243,7 @@ def decorator(*args, **kwargs):
so_path = find_so_path(generated_dir, python_package_name)
if so_path is None:
- print("== we do not find so_path, we need to compile it")
+ logger.debug("== we do not find so_path, we need to compile it")
with open(paddle_custom_op_file_path, "w") as f:
f.write(
SubstituteTemplate(
@@ -290,7 +293,7 @@ def decorator(*args, **kwargs):
codegen_command = aot_template.format(
**config,
)
- print(codegen_command)
+ logger.debug("codegen command: %s", codegen_command)
codegen_commands.append(codegen_command)
multi_process_do(codegen_commands)
@@ -307,7 +310,7 @@ def decorator(*args, **kwargs):
# so_path have be found!
so_path = find_so_path(generated_dir, python_package_name)
- print("== we find so_path: ", so_path)
+ logger.debug("== we find so_path: %s", so_path)
assert so_path is not None
dir_path = os.path.dirname(so_path)
sys.path.append(dir_path)
diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py
index 89b767f9ede..0ce268b9971 100644
--- a/fastdeploy/output/token_processor.py
+++ b/fastdeploy/output/token_processor.py
@@ -220,7 +220,7 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result:
for token_id in token_id_list:
recovery_stop = token_id == RECOVERY_STOP_SIGNAL
if recovery_stop:
- llm_logger.info(f"recovery stop signal found at task {task_id}")
+ llm_logger.debug("recovery stop signal found at task %s", task_id)
self.tokens_counter[task_id] += 1
if token_id != RECOVERY_STOP_SIGNAL:
result.outputs.token_ids.append(token_id)
@@ -249,15 +249,23 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result:
# Print combined log with all required information
ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0
llm_logger.info(
- f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
- f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
- f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, "
- f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
- f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
+ "Request=%s, InputToken=%s, CachedDetail=%s, OutputToken=%s, "
+ "TokenRatio=%.2f, TTFT=%.2f, E2E=%.2f, IsPrefill=%s, "
+ "RecoveryStop=%s, PreemptedCount=%s",
+ task_id,
+ task.prompt_token_ids_len,
+ cached_detail,
+ self.tokens_counter[task_id],
+ token_ratio,
+ ttft,
+ e2e_time,
+ is_prefill,
+ recovery_stop,
+ getattr(task.metrics, "preempted_count", 0),
)
main_process_metrics.request_token_ratio.observe(token_ratio)
- llm_logger.info(f"{self.resource_manager.info()}")
+ llm_logger.debug("%s", self.resource_manager.info())
if self.cfg.speculative_config.method:
self._compute_speculative_status()
if not is_prefill:
@@ -284,10 +292,10 @@ def _process_batch_output_use_zmq(self, receive_datas):
if (
envs.ENABLE_V1_KVCACHE_SCHEDULER and token_ids[-1] == PREEMPTED_TOKEN_ID
) or not envs.ENABLE_V1_KVCACHE_SCHEDULER:
- llm_logger.info(f"Aborted task {task_id} received negative token. Recycling.")
+ llm_logger.debug("Aborted task %s received negative token. Recycling.", task_id)
self.resource_manager.abort_req_ids_set.remove(task_id)
self._recycle_resources(task_id, i, task)
- llm_logger.info(f"{task_id} received negative token. Recycle end.")
+ llm_logger.debug("%s received negative token. Recycle end.", task_id)
abort_res = RequestOutput(
request_id=task_id,
finished=True,
@@ -301,7 +309,7 @@ def _process_batch_output_use_zmq(self, receive_datas):
task_id in self.resource_manager.to_be_rescheduled_request_id_set
and token_ids[-1] == PREEMPTED_TOKEN_ID
):
- llm_logger.info(f"sync preemption for request_id {task_id} done.")
+ llm_logger.debug("sync preemption for request_id %s done.", task_id)
self.resource_manager.reschedule_preempt_task(task_id)
continue
@@ -477,7 +485,7 @@ def process_sampling_results(self):
self.timestamp_for_alive_after_handle_batch = time.time()
except Exception as e:
- llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}")
+ llm_logger.error("while get input_data error: %s %s", e, traceback.format_exc())
def postprocess(self, batch_result: List[RequestOutput], mtype=3):
"""
@@ -523,14 +531,14 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
finished_task_ids = self.engine_worker_queue.get_finished_req()
if len(finished_task_ids) > 0:
for finished_task_id in finished_task_ids:
- llm_logger.info(f"finished_task_id: {finished_task_id}")
+ llm_logger.debug("finished_task_id: %s", finished_task_id)
self.prefill_result_status[finished_task_id[0]] = finished_task_id[1]
if task_id in self.prefill_result_status:
if self.prefill_result_status[task_id] != "finished":
result.error_code = 400
result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}"
- llm_logger.info(
- f"wait for sending cache, request_id: {task_id}, cost seconds: {time.time()-start_time:.5f}"
+ llm_logger.debug(
+ "wait for sending cache, request_id: %s, cost seconds: %.5f", task_id, time.time() - start_time
)
result.metrics.send_request_output_to_decode_time = time.time()
self.split_connector.send_first_token(task.disaggregate_info, [result])
@@ -591,7 +599,7 @@ def _compute_speculative_status(self, result: RequestOutput):
single_head_acceptance_rates.append(
self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1]
)
- spec_logger.info(f" Single head accept ratio: {single_head_acceptance_rates}")
+ spec_logger.debug("Single head accept ratio: %s", single_head_acceptance_rates)
if self.number_of_output_tokens > 1000000:
self.number_of_output_tokens = 0
@@ -754,13 +762,13 @@ def _process_batch_output(self):
if self.cfg.speculative_config.method:
self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i])
if accept_num[i] == PREEMPTED_TOKEN_ID: # in MTP, means preemption has happened in worker
- llm_logger.info(f"sync preemption for request_id {task_id} done.")
+ llm_logger.debug("sync preemption for request_id %s done.", task_id)
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
if task_id in self.resource_manager.abort_req_ids_set:
- llm_logger.info(f"Aborted task {task_id} received negative token. Recycling.")
+ llm_logger.debug("Aborted task %s received negative token. Recycling.", task_id)
self.resource_manager.abort_req_ids_set.remove(task_id)
self._recycle_resources(task_id, i, task)
- llm_logger.info(f"{task_id} received negative token. Recycle end.")
+ llm_logger.debug("%s received negative token. Recycle end.", task_id)
abort_res = RequestOutput(
request_id=task_id,
finished=True,
@@ -775,7 +783,7 @@ def _process_batch_output(self):
if accept_num[i] == -3:
recovery_stop = True
if recovery_stop:
- llm_logger.info(f"recovery stop signal found at task {task_id}")
+ llm_logger.debug("recovery stop signal found at task %s", task_id)
token_ids = [RECOVERY_STOP_SIGNAL]
elif self.use_logprobs:
token_ids = tokens[i][:, 0].tolist()[: accept_num[i]]
@@ -795,16 +803,16 @@ def _process_batch_output(self):
token_ids = [token_id]
recovery_stop = token_id == RECOVERY_STOP_SIGNAL
if recovery_stop:
- llm_logger.info(f"recovery stop signal found at task {task_id}")
+ llm_logger.debug("recovery stop signal found at task %s", task_id)
if not recovery_stop and token_id < 0:
if task_id in self.resource_manager.abort_req_ids_set:
if (
envs.ENABLE_V1_KVCACHE_SCHEDULER and token_id == PREEMPTED_TOKEN_ID
) or not envs.ENABLE_V1_KVCACHE_SCHEDULER:
- llm_logger.info(f"Aborted task {task_id} received negative token. Recycling.")
+ llm_logger.debug("Aborted task %s received negative token. Recycling.", task_id)
self.resource_manager.abort_req_ids_set.remove(task_id)
self._recycle_resources(task_id, i, task)
- llm_logger.info(f"{task_id} received negative token. Recycle end.")
+ llm_logger.debug("%s received negative token. Recycle end.", task_id)
abort_res = RequestOutput(
request_id=task_id,
finished=True,
@@ -818,7 +826,7 @@ def _process_batch_output(self):
task_id in self.resource_manager.to_be_rescheduled_request_id_set
and token_id == PREEMPTED_TOKEN_ID
):
- llm_logger.info(f"sync preemption for request_id {task_id} done.")
+ llm_logger.debug("sync preemption for request_id %s done.", task_id)
self.resource_manager.reschedule_preempt_task(task_id)
continue
@@ -839,7 +847,7 @@ def _process_batch_output(self):
task.metrics.record_recv_first_token()
task.metrics.cal_cost_time()
metrics = copy.copy(task.metrics)
- llm_logger.info(f"task:{task.request_id} start recode first token")
+ llm_logger.debug("task:%s start record first token", task.request_id)
self._record_first_token_metrics(task, current_time)
tracing.trace_report_span(
@@ -946,20 +954,29 @@ def _process_batch_output(self):
ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0
ttft_s = ttft + task.metrics.time_in_queue
llm_logger.info(
- f"Request={task_id}, InputToken={task.prompt_token_ids_len}, "
- f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, "
- f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, "
- f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, "
- f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}"
+ "Request=%s, InputToken=%s, CachedDetail=%s, OutputToken=%s, "
+ "TokenRatio=%.2f, TTFT=%.2f, TTFT_S=%.2f, E2E=%.2f, "
+ "IsPrefill=%s, RecoveryStop=%s, PreemptedCount=%s",
+ task_id,
+ task.prompt_token_ids_len,
+ cached_detail,
+ self.tokens_counter[task_id],
+ token_ratio,
+ ttft,
+ ttft_s,
+ e2e_time,
+ is_prefill,
+ recovery_stop,
+ getattr(task.metrics, "preempted_count", 0),
)
main_process_metrics.request_token_ratio.observe(token_ratio)
- llm_logger.info(f"{self.resource_manager.info()}")
+ llm_logger.debug("%s", self.resource_manager.info())
if self.cfg.speculative_config.method:
self._compute_speculative_status(result)
if not is_prefill:
self._record_completion_metrics(task, current_time)
- llm_logger.info(f"task {task_id} received eos token. Recycling.")
+ llm_logger.debug("task %s received eos token. Recycling.", task_id)
if (
envs.ENABLE_V1_KVCACHE_SCHEDULER
and self.cfg.cache_config.enable_prefix_caching
@@ -969,7 +986,7 @@ def _process_batch_output(self):
task
) # when enable prefix caching, cache kv cache for output tokens
self._recycle_resources(task_id, i, task, result, is_prefill)
- llm_logger.info(f"eos token {task_id} Recycle end.")
+ llm_logger.debug("eos token %s Recycle end.", task_id)
break
llm_logger.debug(f"get response from infer: {result}")
@@ -1135,7 +1152,7 @@ def process_sampling_results(self):
continue
self._process_batch_output()
except Exception as e:
- llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}")
+ llm_logger.error("while get input_data error: %s %s", e, traceback.format_exc())
def stop(self):
"""
@@ -1143,5 +1160,5 @@ def stop(self):
"""
self._is_running = False
self.worker.join()
- llm_logger.info("warm up thread stop")
+ llm_logger.debug("warm up thread stop")
del self.worker
diff --git a/fastdeploy/scheduler/dp_scheduler.py b/fastdeploy/scheduler/dp_scheduler.py
index f5b03eba30f..1f2a7c92b4f 100644
--- a/fastdeploy/scheduler/dp_scheduler.py
+++ b/fastdeploy/scheduler/dp_scheduler.py
@@ -58,7 +58,7 @@ def put_results(self, results: List[RequestOutput]):
finished_responses = [response.request_id for response in responses if response.finished]
if len(finished_responses) > 0:
- self.scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}")
+ self.scheduler_logger.debug("Scheduler has received some finished responses: %s", finished_responses)
with self.mutex:
self.batch_responses_per_step.append([response.raw for response in responses])
@@ -179,8 +179,8 @@ def get_requests(
)
if len(requests) > 0:
- self.scheduler_logger.info(
- f"Scheduler has pulled some request: {[request.request_id for request in requests]}"
+ self.scheduler_logger.debug(
+ "Scheduler has pulled some request: %s", [request.request_id for request in requests]
)
return requests
@@ -228,7 +228,7 @@ def put_requests(self, requests: List[Dict]):
def _put_requests_to_local(self):
while True:
request = self.request_queues.get()
- self.scheduler_logger.info(f"Receive request from puller, request_id: {request.request_id}")
+ self.scheduler_logger.debug("Receive request from puller, request_id: %s", request.request_id)
self._scheduler.put_requests([request])
def _get_response_from_local(self):
diff --git a/fastdeploy/scheduler/global_scheduler.py b/fastdeploy/scheduler/global_scheduler.py
index 9f6d0644613..501d58b3d8a 100644
--- a/fastdeploy/scheduler/global_scheduler.py
+++ b/fastdeploy/scheduler/global_scheduler.py
@@ -125,7 +125,7 @@ def __init__(
self.get_response_workers = threading.Thread(target=self._get_results_worker, daemon=True)
self.get_response_workers.start()
- scheduler_logger.info(f"Scheduler: name={self.name} redis_version={self.client.version}")
+ scheduler_logger.info("Scheduler: name=%s redis_version=%s", self.name, self.client.version)
def _get_hash_slot(self, data: str) -> int:
"""
@@ -370,7 +370,7 @@ def _put_requests_worker(self, tasks: List[Task]) -> List[Task]:
rem_amount=0,
ttl=self.ttl,
)
- scheduler_logger.info(f"Scheduler has enqueued some requests: {requests}")
+ scheduler_logger.debug("Scheduler has enqueued some requests: %s", requests)
if duplicate:
scheduler_logger.warning(
@@ -491,15 +491,17 @@ def get_requests(
ttl=self.ttl,
)
serialized_requests += [(lucky_request_queue_name, element) for element in elements]
- scheduler_logger.info(
- f"Scheduler {self.name} has stolen some requests from another lucky one. "
- f"(name={lucky} num={len(serialized_requests)})"
+ scheduler_logger.debug(
+ "Scheduler %s has stolen some requests from another lucky one. " "(name=%s num=%d)",
+ self.name,
+ lucky,
+ len(serialized_requests),
)
else:
exist_num = self.client.exists(self._instance_name(lucky))
if exist_num == 0:
if self.client.zrem(extend_scheduler_load_table_name, lucky):
- scheduler_logger.info(f"Scheduler {lucky} has been removed")
+ scheduler_logger.debug("Scheduler %s has been removed", lucky)
# blocked read
if len(serialized_requests) == 0:
@@ -517,8 +519,10 @@ def get_requests(
self.client.zincrby(load_table_name, -1, scheduler_name, rem_amount=0, ttl=self.ttl)
serialized_requests.append((request_queue_name, element[1]))
if scheduler_name != self.name:
- scheduler_logger.info(
- f"Scheduler {self.name} has stolen a request from another scheduler. (name={scheduler_name})"
+ scheduler_logger.debug(
+ "Scheduler %s has stolen a request from another scheduler. (name=%s)",
+ self.name,
+ scheduler_name,
)
long_partial_requests = 0
@@ -596,14 +600,16 @@ def get_requests(
ttl=self.ttl,
)
- scheduler_logger.info(f"Scheduler has put remaining request into the queue: {len(remaining_request)}")
+ scheduler_logger.debug("Scheduler has put remaining request into the queue: %d", len(remaining_request))
if len(requests) == 0:
scheduler_logger.debug(
f"Scheduler has put all just-pulled request into the queue: {len(remaining_request)}"
)
if len(requests) > 0:
- scheduler_logger.info(f"Scheduler has pulled some request: {[request.request_id for request in requests]}")
+ scheduler_logger.debug(
+ "Scheduler has pulled some request: %s", [request.request_id for request in requests]
+ )
return requests
def _put_results_worker(self, tasks: List[Task]):
@@ -664,7 +670,7 @@ def _put_results_worker(self, tasks: List[Task]):
self.local_response_not_empty.notify_all()
if len(finished_request_ids) > 0:
- scheduler_logger.info(f"Scheduler has received some finished responses: {finished_request_ids}")
+ scheduler_logger.debug("Scheduler has received some finished responses: %s", finished_request_ids)
for response_queue_name, responses in stolen_responses.items():
self.client.rpush(response_queue_name, *responses, ttl=self.ttl)
@@ -793,7 +799,7 @@ def _get_results() -> Dict[str, List[ScheduledResponse]]:
if finished:
del self.local_responses[request_id]
- scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}")
+ scheduler_logger.debug("Scheduler has pulled a finished response: %s", [request_id])
return results
def reset(self):
@@ -859,7 +865,9 @@ def update_config(self, load_shards_num: Optional[int], reallocate: Optional[boo
self.shard = self._get_hash_slot(self.name) % self.load_shards_num
scheduler_logger.info(
- "Scheduler has reload config, "
- f"load_shards_num({old_load_shards_num} => {self.load_shards_num}) "
- f"shard({old_shard} => {self.shard})"
+ "Scheduler has reload config, " "load_shards_num(%s => %s) shard(%s => %s)",
+ old_load_shards_num,
+ self.load_shards_num,
+ old_shard,
+ self.shard,
)
diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py
index fc4a64686b5..f36f4f10946 100644
--- a/fastdeploy/scheduler/local_scheduler.py
+++ b/fastdeploy/scheduler/local_scheduler.py
@@ -191,7 +191,7 @@ def put_requests(self, requests: List[Request]) -> List[Tuple[str, Optional[str]
self.ids += valid_ids
self.requests_not_empty.notify_all()
- scheduler_logger.info(f"Scheduler has enqueued some requests: {valid_ids}")
+ scheduler_logger.debug("Scheduler has enqueued some requests: %s", valid_ids)
if len(duplicated_ids) > 0:
scheduler_logger.warning(f"Scheduler has received some duplicated requests: {duplicated_ids}")
@@ -300,7 +300,9 @@ def get_requests(
scheduler_logger.debug(f"Scheduler has put all just-pulled request into the queue: {len(batch_ids)}")
if len(requests) > 0:
- scheduler_logger.info(f"Scheduler has pulled some request: {[request.request_id for request in requests]}")
+ scheduler_logger.debug(
+ "Scheduler has pulled some request: %s", [request.request_id for request in requests]
+ )
return requests
@@ -316,7 +318,7 @@ def put_results(self, results: List[RequestOutput]):
finished_responses = [response.request_id for response in responses if response.finished]
if len(finished_responses) > 0:
- scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}")
+ scheduler_logger.debug("Scheduler has received some finished responses: %s", finished_responses)
with self.mutex:
self.batch_responses_per_step.append([response.raw for response in responses])
@@ -381,7 +383,7 @@ def _get_results():
if finished:
self._recycle(request_id)
- scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}")
+ scheduler_logger.debug("Scheduler has pulled a finished response: %s", [request_id])
if results:
scheduler_logger.debug(f"get responses, {results}")
diff --git a/fastdeploy/scheduler/splitwise_scheduler.py b/fastdeploy/scheduler/splitwise_scheduler.py
index cd5e0736436..a66ae8db461 100644
--- a/fastdeploy/scheduler/splitwise_scheduler.py
+++ b/fastdeploy/scheduler/splitwise_scheduler.py
@@ -119,7 +119,7 @@ def start(self, role, host, disaggregated):
"""
Start APIScheduler and InferScheduler backup threads
"""
- logger.info(f"Scheduler Start With: role:{role}, host:{host}, disaggregated:{disaggregated}")
+ logger.info("Scheduler Start With: role:%s, host:%s, disaggregated:%s", role, host, disaggregated)
self.infer.start(role, host, disaggregated)
self.scheduler.start()
@@ -611,7 +611,7 @@ def select(req, nodes, blur_step):
break
blur_idx = idx
node = random.choice(nodes[: blur_idx + 1])
- logger.info(f"Schedule Req {req.request_id}(len:{req.prompt_token_ids_len}) to {node}")
+ logger.debug("Schedule Req %s(len:%s) to %s", req.request_id, req.prompt_token_ids_len, node)
return node
if role == "prefill" or role == "mixed":
@@ -727,7 +727,7 @@ def check_redis_version(self):
version_parts[1] >= 2 if version_parts[0] == 6 else True
), f"Redis version {redis_version} too low. Please upgrade to Redis 6.2+ to support batch RPOP operations."
- logger.info(f"Redis version {redis_version} detected. Using native batch RPOP.")
+ logger.info("Redis version %s detected. Using native batch RPOP.", redis_version)
def start(self, role, host, disaggregated):
"""
@@ -805,7 +805,7 @@ def select_writer(req):
req = pickle.loads(req_str)
group = req.get("group", "")
writer_idx = select_writer(req)
- logger.info(f"Infer Scheduler Get Req: {req.request_id} writer idx {writer_idx}")
+ logger.debug("Infer Scheduler Get Req: %s writer idx %s", req.request_id, writer_idx)
req.request_id = f"{req.request_id}#{writer_idx}#{group}"
if self.role == "prefill" or self.role == "mixed":
self.reqs_queue.append(req)
@@ -884,7 +884,7 @@ def put_results(self, results):
for result in results:
if result.error_code != 200 or result.finished:
self.node.finish_req(result.request_id)
- logger.info(f"{result.request_id} finished, node load is {self.node.load}")
+ logger.debug("%s finished, node load is %s", result.request_id, self.node.load)
req_ids.add(result.request_id)
diff --git a/tests/test_logging.py b/tests/test_logging.py
new file mode 100644
index 00000000000..188e4e95043
--- /dev/null
+++ b/tests/test_logging.py
@@ -0,0 +1,180 @@
+# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Unit tests for FastDeploy logging infrastructure.
+
+Tests cover:
+- get_logger() returns loggers with correct naming
+- FD_LOG_LEVEL env var controls log level
+- FastDeployLogger singleton behavior
+- Console handler presence in unified logger
+- Legacy get_logger(name, file) backward compatibility
+"""
+
+import logging
+import os
+import unittest
+from unittest.mock import patch
+
+try:
+ import paddle # noqa: F401
+
+ HAS_PADDLE = True
+except ImportError:
+ HAS_PADDLE = False
+
+SKIP_MSG = "PaddlePaddle is not installed"
+
+
+@unittest.skipUnless(HAS_PADDLE, SKIP_MSG)
+class TestGetLogger(unittest.TestCase):
+ """Tests for the fastdeploy.logger.get_logger convenience function."""
+
+ def test_get_logger_returns_logger_instance(self):
+ from fastdeploy.logger import get_logger
+
+ logger = get_logger("test_module")
+ self.assertIsInstance(logger, logging.Logger)
+
+ def test_get_logger_prefixes_with_fastdeploy(self):
+ from fastdeploy.logger import get_logger
+
+ logger = get_logger("test_module")
+ self.assertTrue(
+ logger.name.startswith("fastdeploy"),
+ f"Expected logger name to start with 'fastdeploy', got '{logger.name}'",
+ )
+
+ def test_get_logger_none_returns_root_fastdeploy(self):
+ from fastdeploy.logger import get_logger
+
+ logger = get_logger(None)
+ self.assertEqual(logger.name, "fastdeploy")
+
+ def test_get_logger_already_namespaced(self):
+ from fastdeploy.logger import get_logger
+
+ logger = get_logger("fastdeploy.engine")
+ self.assertEqual(logger.name, "fastdeploy.engine")
+
+ def test_get_logger_adds_prefix_for_plain_name(self):
+ from fastdeploy.logger import get_logger
+
+ logger = get_logger("scheduler")
+ self.assertEqual(logger.name, "fastdeploy.scheduler")
+
+
+@unittest.skipUnless(HAS_PADDLE, SKIP_MSG)
+class TestFDLogLevel(unittest.TestCase):
+ """Tests for FD_LOG_LEVEL environment variable."""
+
+ def test_fd_log_level_default_is_info(self):
+ with patch.dict(os.environ, {}, clear=False):
+ # Remove FD_LOG_LEVEL and FD_DEBUG if set
+ os.environ.pop("FD_LOG_LEVEL", None)
+ os.environ.pop("FD_DEBUG", None)
+ # envs uses lazy lambdas, so reading the attribute re-evaluates os.getenv
+ from fastdeploy import envs
+
+ level = envs.FD_LOG_LEVEL
+ self.assertEqual(level, "INFO")
+
+ def test_fd_log_level_debug_when_fd_debug_set(self):
+ with patch.dict(os.environ, {"FD_DEBUG": "1"}, clear=False):
+ os.environ.pop("FD_LOG_LEVEL", None)
+ from fastdeploy import envs
+
+ level = envs.FD_LOG_LEVEL
+ self.assertEqual(level, "DEBUG")
+
+ def test_fd_log_level_explicit_overrides_fd_debug(self):
+ with patch.dict(os.environ, {"FD_DEBUG": "1", "FD_LOG_LEVEL": "WARNING"}, clear=False):
+ from fastdeploy import envs
+
+ level = envs.FD_LOG_LEVEL
+ self.assertEqual(level, "WARNING")
+
+ def test_fd_log_level_accepts_error(self):
+ with patch.dict(os.environ, {"FD_LOG_LEVEL": "ERROR"}, clear=False):
+ from fastdeploy import envs
+
+ level = envs.FD_LOG_LEVEL
+ self.assertEqual(level, "ERROR")
+
+
+@unittest.skipUnless(HAS_PADDLE, SKIP_MSG)
+class TestFastDeployLoggerSingleton(unittest.TestCase):
+ """Tests for FastDeployLogger singleton pattern."""
+
+ def test_singleton_returns_same_instance(self):
+ from fastdeploy.logger import FastDeployLogger
+
+ instance1 = FastDeployLogger()
+ instance2 = FastDeployLogger()
+ self.assertIs(instance1, instance2)
+
+
+@unittest.skipUnless(HAS_PADDLE, SKIP_MSG)
+class TestConsoleHandler(unittest.TestCase):
+ """Tests for console handler presence in unified logger setup."""
+
+ def setUp(self):
+ """Reset setup_logging state for clean test isolation."""
+ from fastdeploy.logger.setup_logging import setup_logging
+
+ setup_logging._configured = False
+
+ def test_unified_logger_has_console_handler(self):
+ from fastdeploy.logger.setup_logging import setup_logging
+
+ fd_logger = setup_logging()
+
+ handler_classes = [type(h).__name__ for h in fd_logger.handlers]
+ self.assertTrue(
+ any("StreamHandler" in cls for cls in handler_classes),
+ f"Expected a StreamHandler (console) among handlers, got: {handler_classes}",
+ )
+
+ def tearDown(self):
+ """Reset setup_logging state after test."""
+ from fastdeploy.logger.setup_logging import setup_logging
+
+ setup_logging._configured = False
+
+
+@unittest.skipUnless(HAS_PADDLE, SKIP_MSG)
+class TestLegacyGetLogger(unittest.TestCase):
+ """Tests for backward compatibility with legacy get_logger(name, file)."""
+
+ def test_legacy_get_logger_still_works(self):
+ from fastdeploy.utils import get_logger
+
+ logger = get_logger("test_legacy", "test_legacy.log")
+ self.assertIsInstance(logger, logging.Logger)
+ # Legacy loggers use "legacy." prefix namespace
+ self.assertTrue(
+ logger.name.startswith("legacy."),
+ f"Expected legacy logger to have 'legacy.' prefix, got '{logger.name}'",
+ )
+
+ def test_legacy_prebuilt_loggers_accessible(self):
+ from fastdeploy.utils import llm_logger, scheduler_logger
+
+ self.assertIsInstance(llm_logger, logging.Logger)
+ self.assertIsInstance(scheduler_logger, logging.Logger)
+
+
+if __name__ == "__main__":
+ unittest.main()