From d502f31aa9d44e0b9d249273c1b590aaf29b53b5 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Thu, 5 Mar 2026 22:19:20 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E3=80=90Hackathon=209th=20No.88=E3=80=91Re?= =?UTF-8?q?factor=20log=20printing:=20unified=20FD=5FLOG=5FLEVEL,=20consol?= =?UTF-8?q?e=20handler,=20logger=20exports?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add FD_LOG_LEVEL env var to envs.py (defaults based on FD_DEBUG) - Populate fastdeploy/logger/__init__.py with get_logger() + exports - Add console StreamHandler to unified fastdeploy logger - Unify legacy loggers (get_trace_logger, _get_legacy_logger) to respect FD_LOG_LEVEL - Migrate print()/f-string logger calls in model_base.py to lazy %s formatting - Add 13 unit tests in tests/test_logging.py covering logger naming, FD_LOG_LEVEL env var, singleton pattern, console handler, legacy compat --- fastdeploy/envs.py | 3 + fastdeploy/logger/__init__.py | 46 +++++ fastdeploy/logger/logger.py | 20 +- fastdeploy/logger/setup_logging.py | 7 +- .../model_executor/models/model_base.py | 11 +- tests/test_logging.py | 180 ++++++++++++++++++ 6 files changed, 250 insertions(+), 17 deletions(-) create mode 100644 tests/test_logging.py diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 1b2dc7af99c..c457e4e3e31 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -36,6 +36,9 @@ def _validate_split_kv_size(value: int) -> int: "FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"), # Whether to use debug mode, can set 0 or 1 "FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")), + # Log level for FastDeploy loggers. Supports: DEBUG, INFO, WARNING, ERROR. + # Defaults to DEBUG if FD_DEBUG=1, otherwise INFO. + "FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", "DEBUG" if environment_variables["FD_DEBUG"]() else "INFO"), # Number of days to keep fastdeploy logs. "FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"), # Model download source, can set "AISTUDIO", "MODELSCOPE" or "HUGGINGFACE". diff --git a/fastdeploy/logger/__init__.py b/fastdeploy/logger/__init__.py index e69de29bb2d..f17d2c3212b 100644 --- a/fastdeploy/logger/__init__.py +++ b/fastdeploy/logger/__init__.py @@ -0,0 +1,46 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +FastDeploy logging module. + +Provides a unified logging interface for all FastDeploy components. + +Usage:: + + from fastdeploy.logger import get_logger + + logger = get_logger(__name__) + logger.info("Hello from FastDeploy") +""" + +from fastdeploy.logger.logger import FastDeployLogger + + +def get_logger(name=None): + """Get a FastDeploy logger with the given name. + + The name will be prefixed with 'fastdeploy.' automatically + if it is not already in the fastdeploy namespace. + + Args: + name: Logger name. If None, returns the root fastdeploy logger. + + Returns: + logging.Logger instance + """ + return FastDeployLogger().get_logger(name) + + +__all__ = ["get_logger", "FastDeployLogger"] diff --git a/fastdeploy/logger/logger.py b/fastdeploy/logger/logger.py index 63d431ce81b..160348b8c89 100644 --- a/fastdeploy/logger/logger.py +++ b/fastdeploy/logger/logger.py @@ -105,17 +105,17 @@ def get_trace_logger(self, name, file_name, without_formater=False, print_to_con if not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) - is_debug = int(envs.FD_DEBUG) + # Use FD_LOG_LEVEL for unified log level control (falls back to FD_DEBUG) + log_level_str = getattr(envs, "FD_LOG_LEVEL", "INFO") + log_level = getattr(logging, log_level_str.upper(), logging.INFO) + # logger = logging.getLogger(name) # Use namespace for isolation to avoid logger overwrite and confusion issues, for compatibility with original interface legacy_name = f"legacy.{name}" logger = logging.getLogger(legacy_name) # Set log level - if is_debug: - logger.setLevel(level=logging.DEBUG) - else: - logger.setLevel(level=logging.INFO) + logger.setLevel(level=log_level) # Set formatter formatter = CustomFormatter( @@ -170,17 +170,17 @@ def _get_legacy_logger(self, name, file_name, without_formater=False, print_to_c if not os.path.exists(log_dir): os.makedirs(log_dir, exist_ok=True) - is_debug = envs.FD_DEBUG + # Use FD_LOG_LEVEL for unified log level control (falls back to FD_DEBUG) + log_level_str = getattr(envs, "FD_LOG_LEVEL", "INFO") + log_level = getattr(logging, log_level_str.upper(), logging.INFO) + # logger = logging.getLogger(name) # Use namespace for isolation to avoid logger overwrite and confusion issues, for compatibility with original interface legacy_name = f"legacy.{name}" logger = logging.getLogger(legacy_name) # Set log level - if is_debug: - logger.setLevel(level=logging.DEBUG) - else: - logger.setLevel(level=logging.INFO) + logger.setLevel(level=log_level) # Set formatter - use standard format for both file and console (no color) formatter = logging.Formatter( diff --git a/fastdeploy/logger/setup_logging.py b/fastdeploy/logger/setup_logging.py index 2dd24b379df..3ba66a5eb4b 100644 --- a/fastdeploy/logger/setup_logging.py +++ b/fastdeploy/logger/setup_logging.py @@ -47,8 +47,9 @@ def setup_logging(log_dir=None, config_file=None): Path(log_dir).mkdir(parents=True, exist_ok=True) # 从环境变量获取日志级别和备份数量 - is_debug = int(getattr(envs, "FD_DEBUG", 0)) - FASTDEPLOY_LOGGING_LEVEL = "DEBUG" if is_debug else "INFO" + log_level = getattr(envs, "FD_LOG_LEVEL", "INFO") + FASTDEPLOY_LOGGING_LEVEL = log_level + is_debug = FASTDEPLOY_LOGGING_LEVEL == "DEBUG" backup_count = int(getattr(envs, "FD_LOG_BACKUP_COUNT", 7)) # 定义日志输出格式 @@ -122,7 +123,7 @@ def setup_logging(log_dir=None, config_file=None): # 默认日志记录器,全局共享 "fastdeploy": { "level": "DEBUG", - "handlers": ["error_file", "default_file", "error_archive", "default_archive"], + "handlers": ["console", "error_file", "default_file", "error_archive", "default_archive"], "propagate": False, } }, diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py index f78572ccc8c..efe26d0259b 100644 --- a/fastdeploy/model_executor/models/model_base.py +++ b/fastdeploy/model_executor/models/model_base.py @@ -26,8 +26,11 @@ iter_architecture_defaults, try_match_architecture_defaults, ) +from fastdeploy.logger import get_logger from fastdeploy.model_executor.models.interfaces_base import get_default_pooling_type +logger = get_logger(__name__) + class ModelCategory(IntFlag): TEXT_GENERATION = auto() @@ -106,7 +109,7 @@ def _try_inspect_model_cls( try: return model.inspect_model_cls() except Exception: - print("Error in inspecting model architecture '%s'", model_arch) + logger.error("Error in inspecting model architecture '%s'", model_arch) return None @@ -139,7 +142,7 @@ def _try_load_model_cls(self, architecture: str) -> Optional[Type[nn.Layer]]: try: return self.models[architecture].load_model_cls() except Exception as e: - print(f"Failed to load model {architecture}: {e}") + logger.error("Failed to load model %s: %s", architecture, e) return None @lru_cache(maxsize=128) @@ -149,7 +152,7 @@ def _try_inspect_model_cls(self, model_arch: str) -> Optional[ModelInfo]: try: return self.models[model_arch].inspect_model_cls() except Exception as e: - print(f"Failed to inspect model {model_arch}: {e}") + logger.error("Failed to inspect model %s: %s", model_arch, e) return None def _normalize_arch(self, architecture: str, model_config: ModelConfig) -> str: @@ -271,7 +274,7 @@ def inspect_model_cls( if fallback_backend is not None: model_info = self._try_inspect_model_cls(fallback_backend) if model_info is not None: - print(f"Using PaddleFormers backend as fallback for {model_info.architecture}") + logger.info("Using PaddleFormers backend as fallback for %s", model_info.architecture) return (model_info, model_info.architecture) return self._raise_for_unsupported(architectures) diff --git a/tests/test_logging.py b/tests/test_logging.py new file mode 100644 index 00000000000..188e4e95043 --- /dev/null +++ b/tests/test_logging.py @@ -0,0 +1,180 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unit tests for FastDeploy logging infrastructure. + +Tests cover: +- get_logger() returns loggers with correct naming +- FD_LOG_LEVEL env var controls log level +- FastDeployLogger singleton behavior +- Console handler presence in unified logger +- Legacy get_logger(name, file) backward compatibility +""" + +import logging +import os +import unittest +from unittest.mock import patch + +try: + import paddle # noqa: F401 + + HAS_PADDLE = True +except ImportError: + HAS_PADDLE = False + +SKIP_MSG = "PaddlePaddle is not installed" + + +@unittest.skipUnless(HAS_PADDLE, SKIP_MSG) +class TestGetLogger(unittest.TestCase): + """Tests for the fastdeploy.logger.get_logger convenience function.""" + + def test_get_logger_returns_logger_instance(self): + from fastdeploy.logger import get_logger + + logger = get_logger("test_module") + self.assertIsInstance(logger, logging.Logger) + + def test_get_logger_prefixes_with_fastdeploy(self): + from fastdeploy.logger import get_logger + + logger = get_logger("test_module") + self.assertTrue( + logger.name.startswith("fastdeploy"), + f"Expected logger name to start with 'fastdeploy', got '{logger.name}'", + ) + + def test_get_logger_none_returns_root_fastdeploy(self): + from fastdeploy.logger import get_logger + + logger = get_logger(None) + self.assertEqual(logger.name, "fastdeploy") + + def test_get_logger_already_namespaced(self): + from fastdeploy.logger import get_logger + + logger = get_logger("fastdeploy.engine") + self.assertEqual(logger.name, "fastdeploy.engine") + + def test_get_logger_adds_prefix_for_plain_name(self): + from fastdeploy.logger import get_logger + + logger = get_logger("scheduler") + self.assertEqual(logger.name, "fastdeploy.scheduler") + + +@unittest.skipUnless(HAS_PADDLE, SKIP_MSG) +class TestFDLogLevel(unittest.TestCase): + """Tests for FD_LOG_LEVEL environment variable.""" + + def test_fd_log_level_default_is_info(self): + with patch.dict(os.environ, {}, clear=False): + # Remove FD_LOG_LEVEL and FD_DEBUG if set + os.environ.pop("FD_LOG_LEVEL", None) + os.environ.pop("FD_DEBUG", None) + # envs uses lazy lambdas, so reading the attribute re-evaluates os.getenv + from fastdeploy import envs + + level = envs.FD_LOG_LEVEL + self.assertEqual(level, "INFO") + + def test_fd_log_level_debug_when_fd_debug_set(self): + with patch.dict(os.environ, {"FD_DEBUG": "1"}, clear=False): + os.environ.pop("FD_LOG_LEVEL", None) + from fastdeploy import envs + + level = envs.FD_LOG_LEVEL + self.assertEqual(level, "DEBUG") + + def test_fd_log_level_explicit_overrides_fd_debug(self): + with patch.dict(os.environ, {"FD_DEBUG": "1", "FD_LOG_LEVEL": "WARNING"}, clear=False): + from fastdeploy import envs + + level = envs.FD_LOG_LEVEL + self.assertEqual(level, "WARNING") + + def test_fd_log_level_accepts_error(self): + with patch.dict(os.environ, {"FD_LOG_LEVEL": "ERROR"}, clear=False): + from fastdeploy import envs + + level = envs.FD_LOG_LEVEL + self.assertEqual(level, "ERROR") + + +@unittest.skipUnless(HAS_PADDLE, SKIP_MSG) +class TestFastDeployLoggerSingleton(unittest.TestCase): + """Tests for FastDeployLogger singleton pattern.""" + + def test_singleton_returns_same_instance(self): + from fastdeploy.logger import FastDeployLogger + + instance1 = FastDeployLogger() + instance2 = FastDeployLogger() + self.assertIs(instance1, instance2) + + +@unittest.skipUnless(HAS_PADDLE, SKIP_MSG) +class TestConsoleHandler(unittest.TestCase): + """Tests for console handler presence in unified logger setup.""" + + def setUp(self): + """Reset setup_logging state for clean test isolation.""" + from fastdeploy.logger.setup_logging import setup_logging + + setup_logging._configured = False + + def test_unified_logger_has_console_handler(self): + from fastdeploy.logger.setup_logging import setup_logging + + fd_logger = setup_logging() + + handler_classes = [type(h).__name__ for h in fd_logger.handlers] + self.assertTrue( + any("StreamHandler" in cls for cls in handler_classes), + f"Expected a StreamHandler (console) among handlers, got: {handler_classes}", + ) + + def tearDown(self): + """Reset setup_logging state after test.""" + from fastdeploy.logger.setup_logging import setup_logging + + setup_logging._configured = False + + +@unittest.skipUnless(HAS_PADDLE, SKIP_MSG) +class TestLegacyGetLogger(unittest.TestCase): + """Tests for backward compatibility with legacy get_logger(name, file).""" + + def test_legacy_get_logger_still_works(self): + from fastdeploy.utils import get_logger + + logger = get_logger("test_legacy", "test_legacy.log") + self.assertIsInstance(logger, logging.Logger) + # Legacy loggers use "legacy." prefix namespace + self.assertTrue( + logger.name.startswith("legacy."), + f"Expected legacy logger to have 'legacy.' prefix, got '{logger.name}'", + ) + + def test_legacy_prebuilt_loggers_accessible(self): + from fastdeploy.utils import llm_logger, scheduler_logger + + self.assertIsInstance(llm_logger, logging.Logger) + self.assertIsInstance(scheduler_logger, logging.Logger) + + +if __name__ == "__main__": + unittest.main() From 8a2f2ff02b2f2be3e1354ac74f9c892cd0fdc461 Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 6 Mar 2026 01:09:24 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E3=80=90Hackathon=209th=20No.88=E3=80=91Mi?= =?UTF-8?q?grate=20f-string=20logger=20calls=20to=20lazy=20%s=20formatting?= =?UTF-8?q?=20in=20config,=20engine,=20entrypoints,=20scheduler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config.py: demote verbose config dumps to logger.debug, fix f-string → %s - engine.py: demote internal debug messages, migrate f-strings to lazy formatting - common_engine.py: migrate all f-string logger calls to lazy %s formatting - api_server.py, multi_api_server.py: standardize log formatting - serving_engine.py, serving_embedding.py: lazy formatting migration - global_scheduler.py: migrate f-string logger calls to lazy %s formatting --- fastdeploy/config.py | 64 ++++++++++-------- fastdeploy/engine/common_engine.py | 66 +++++++++---------- fastdeploy/engine/engine.py | 31 ++++----- fastdeploy/entrypoints/api_server.py | 6 +- fastdeploy/entrypoints/openai/api_server.py | 30 ++++----- .../entrypoints/openai/multi_api_server.py | 20 +++--- .../entrypoints/openai/serving_embedding.py | 2 +- .../entrypoints/openai/serving_engine.py | 10 +-- fastdeploy/scheduler/global_scheduler.py | 18 ++--- 9 files changed, 131 insertions(+), 116 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 4ebfd4584b5..15f049bde8b 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -597,10 +597,10 @@ def print(self): """ Print all configuration information. """ - logger.info("Model Configuration Information :") + logger.debug("Model Configuration Information:") for k, v in self.__dict__.items(): - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") class ParallelConfig: @@ -700,7 +700,10 @@ def set_communicate_group(self): self.ep_group = dist.new_group(range(self.expert_parallel_size)) dist.collective._set_custom_gid(None) logger.info( - f"data_parallel_size: {self.data_parallel_size}, tensor_parallel_size: {self.tensor_parallel_size}, expert_parallel_size: {self.expert_parallel_size}, data_parallel_rank: {self.data_parallel_rank}, tensor_parallel_rank: {self.tensor_parallel_rank}, expert_parallel_rank: {self.expert_parallel_rank}, tp_group: {self.tp_group}." + "data_parallel_size: %d, tensor_parallel_size: %d, expert_parallel_size: %d, " + "data_parallel_rank: %d, tensor_parallel_rank: %d, expert_parallel_rank: %d, tp_group: %s", + self.data_parallel_size, self.tensor_parallel_size, self.expert_parallel_size, + self.data_parallel_rank, self.tensor_parallel_rank, self.expert_parallel_rank, self.tp_group, ) def print(self): @@ -708,10 +711,10 @@ def print(self): print all config """ - logger.info("Parallel Configuration Information :") + logger.debug("Parallel Configuration Information:") for k, v in self.__dict__.items(): - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") class SpeculativeConfig: @@ -836,10 +839,10 @@ def print(self): print all config """ - logger.info("Speculative Decoding Configuration Information :") + logger.debug("Speculative Decoding Configuration Information:") for k, v in self.__dict__.items(): - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") def check_legality_parameters( self, @@ -1339,10 +1342,10 @@ def print(self): """ Print all configuration information. """ - logger.info("EPLB Configuration Information :") + logger.debug("EPLB Configuration Information:") for k, v in self.__dict__.items(): - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") class CacheConfig: @@ -1516,10 +1519,10 @@ def print(self): print all config """ - logger.info("Cache Configuration Information :") + logger.debug("Cache Configuration Information:") for k, v in self.__dict__.items(): - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") class RouterConfig: @@ -1586,19 +1589,19 @@ def _load_from_version_file(self, file_path: str = None): elif line.startswith("CXX compiler version:"): self.compiler_version = line.split(":")[1].strip() except FileNotFoundError: - logger.info(f"Warning: Version file not found at {file_path}") + logger.warning("Version file not found at %s", file_path) except Exception as e: - logger.info(f"Warning: Could not read version file - {e!s}") + logger.warning("Could not read version file: %s", e) def print(self): """ print all config """ - logger.info("Fasedeploy Commit Information :") + logger.debug("FastDeploy Commit Information:") for k, v in self.__dict__.items(): - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") class StructuredOutputsConfig: @@ -2121,11 +2124,20 @@ def print(self): """ print all config """ - logger.info("=================== Configuration Information ===============") + logger.info( + "Configuration: model=%s, tp=%d, max_batch=%d, max_seq_len=%d, dtype=%s, device=%s", + self.model_config.model, + self.parallel_config.tensor_parallel_size, + self.scheduler_config.max_num_seqs, + self.model_config.max_model_len, + self.model_config.dtype, + self.parallel_config.device_ids, + ) + logger.debug("=================== Configuration Information ===============") for k, v in self.__dict__.items(): if k == "generation_config" and v is not None: for gck, gcv in v.to_dict().items(): - logger.info("{:<20}:{:<6}{}".format(gck, "", gcv)) + logger.debug("{:<20}:{:<6}{}".format(gck, "", gcv)) elif ( k == "cache_config" or k == "model_config" @@ -2136,8 +2148,8 @@ def print(self): if v is not None: v.print() else: - logger.info("{:<20}:{:<6}{}".format(k, "", v)) - logger.info("=============================================================") + logger.debug("{:<20}:{:<6}{}".format(k, "", v)) + logger.debug("=============================================================") def init_cache_info(self): """ @@ -2171,7 +2183,7 @@ def init_cache_info(self): "transfer_protocol": transfer_protocol, "tp_size": self.parallel_config.tensor_parallel_size, } - logger.info(f"register_info: {self.register_info}") + logger.debug("register_info: %s", self.register_info) def read_from_config(self): """ diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 5723a239378..b5c012dacff 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -72,7 +72,7 @@ try: TokenProcessor = load_token_processor_plugins() - llm_logger.info(f"TokenProcessor plugin {TokenProcessor} loaded") + llm_logger.debug("TokenProcessor plugin %s loaded", TokenProcessor) except: from fastdeploy.output.token_processor import TokenProcessor @@ -108,7 +108,7 @@ def __init__(self, cfg, start_queue=True, use_async_llm=False): for rank in range(tp_size): engine_worker_queue_port = self.cfg.parallel_config.local_engine_worker_queue_port name = f"ctrl_w2e_rank{rank+tp_size*dp_index}_{engine_worker_queue_port}" - self.llm_logger.info(f"Init Worker Control Output Queue: {name}(consumer)") + self.llm_logger.debug("Init Worker Control Output Queue: %s (consumer)", name) self._ctrl_worker_output_queues.append(FMQ().queue(name, "consumer")) self.scheduler = cfg.scheduler_config.scheduler() @@ -136,7 +136,7 @@ def __init__(self, cfg, start_queue=True, use_async_llm=False): self.start_worker_queue_service(start_queue) os.environ["INFERENCE_MSG_QUEUE_ID"] = str(self.cfg.parallel_config.local_engine_worker_queue_port) - self.llm_logger.info(f"INFERENCE_MSG_QUEUE_ID: {str(self.cfg.parallel_config.local_engine_worker_queue_port)}") + self.llm_logger.debug("INFERENCE_MSG_QUEUE_ID: %s", self.cfg.parallel_config.local_engine_worker_queue_port) self.split_connector = SplitwiseConnector(cfg, self.engine_worker_queue, self.resource_manager) self.token_processor = TokenProcessor( @@ -283,7 +283,7 @@ def create_data_processor(self): def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进程感知是否有新Task需要处理 current_suffix = self.cfg.parallel_config.local_engine_worker_queue_port - self.llm_logger.info(f"current_suffix: {current_suffix}") + self.llm_logger.debug("current_suffix: %s", current_suffix) exist_task_signal_data = np.zeros([1], dtype=np.int32) self.exist_task_signal = IPCSignal( name="exist_task_signal", @@ -392,7 +392,7 @@ def start_worker_queue_service(self, start_queue): if self.cfg.host_ip == self.cfg.master_ip or self.cfg.master_ip == "0.0.0.0": if start_queue: - self.llm_logger.info(f"Starting engine worker queue server service at {address}") + self.llm_logger.debug("Starting engine worker queue server service at %s", address) self.engine_worker_queue_server = EngineWorkerQueue( address=address, is_server=True, @@ -410,8 +410,8 @@ def start_worker_queue_service(self, start_queue): ) if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed": - self.llm_logger.info( - f"Starting engine cache queue server service at {self.cfg.cache_config.local_cache_queue_port}" + self.llm_logger.debug( + "Starting engine cache queue server service at %s", self.cfg.cache_config.local_cache_queue_port ) self.cache_task_queue = EngineCacheQueue( address=(self.cfg.master_ip, self.cfg.cache_config.local_cache_queue_port), @@ -507,7 +507,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1): self.split_connector.send_cache_info_to_prefill(tasks) if not is_decode: - self.llm_logger.info(f"Tasks are sent to engine, req_ids={req_ids}") + self.llm_logger.debug("Tasks are sent to engine, req_ids=%s", req_ids) for task in tasks: if not getattr(task, "has_been_preempted_before", False): task.metrics.inference_start_time = time.time() @@ -965,8 +965,8 @@ def _fetch_request(): ) if self.cfg.scheduler_config.splitwise_role == "prefill": self.resource_manager.add_request_in_p(tasks) - self.llm_logger.info( - f"P add requests into running queue: {[task.request_id for task in tasks]}" + self.llm_logger.debug( + "P add requests into running queue: %s", [task.request_id for task in tasks] ) else: for task in tasks: @@ -1164,18 +1164,18 @@ def _insert_zmq_task_to_scheduler(self): status_value = data.get("status", None) if status_value is not None and status_value == RequestStatus.ABORT.value: req_id = data["request_id"] - self.llm_logger.info(f"Receive abort request, req_id: {req_id}") + self.llm_logger.info(\"Receive abort request, req_id: %s\", req_id) self.resource_manager.abort_req_ids_set.add(req_id) if envs.ENABLE_V1_KVCACHE_SCHEDULER: if req_id in self.resource_manager.requests: req = self.resource_manager.requests[req_id] task = self.resource_manager._prepare_preempt_task(req) self.engine_worker_queue.put_tasks(([task], self.resource_manager.real_bsz)) - self.llm_logger.info(f"put abort task in engine worker queue, req_id: {req_id}") + self.llm_logger.debug("put abort task in engine worker queue, req_id: %s", req_id) else: self.scheduler._recycle(req_id) - self.llm_logger.info( - f"req_id:{req_id} has not been allocated any resources, recycled it in scheduler" + self.llm_logger.debug( + "req_id:%s has not been allocated any resources, recycled it in scheduler", req_id ) self.resource_manager.abort_req_ids_set.remove(req_id) continue @@ -1259,7 +1259,7 @@ def run_control_method(self, control_req: ControlRequest): request_id = control_req.request_id try: - self.llm_logger.info(f"START run control method {request_id}: {method}") + self.llm_logger.debug("START run control method %s: %s", request_id, method) handler_name = f"_control_{method}" handler = getattr(self, handler_name, None) @@ -1270,7 +1270,7 @@ def run_control_method(self, control_req: ControlRequest): return result = handler(control_req) - self.llm_logger.info(f"SUCCESS run control method {method}.") + self.llm_logger.debug("SUCCESS run control method %s", method) succ_result = ControlResponse(request_id, 200, "Success", result) self.send_response_server.send_response(request_id, [succ_result]) @@ -1319,7 +1319,7 @@ def _control_pause(self, control_request: ControlRequest): raise Exception(error_msg) running_reqs = self.resource_manager.preempted_all() if len(running_reqs) > 0: - self.llm_logger.info(f"Total {len(running_reqs)} requests need to be aborted.") + self.llm_logger.info("Total %d requests need to be aborted.", len(running_reqs)) self.resource_manager.get_real_bsz() self.engine_worker_queue.put_tasks((running_reqs, self.resource_manager.real_bsz)) self.resource_manager.wait_worker_inflight_requests_finish(timeout=60) @@ -1329,7 +1329,7 @@ def _control_pause(self, control_request: ControlRequest): # abort inflight requests to user inflight_requests = self.scheduler.get_inflight_requests() - self.llm_logger.info(f"Start Abort Inflight Requests, total {len(inflight_requests)} waiting requests") + self.llm_logger.info("Start Abort Inflight Requests, total %d waiting requests", len(inflight_requests)) for req in inflight_requests: self._send_error_response(req.request_id, "Request is aborted since LLM Engine is paused.") self.scheduler.reset() @@ -1366,7 +1366,7 @@ def _control_is_paused(self, control_request: ControlRequest) -> bool: Returns: dict: Dictionary containing pause status information, {'is_paused': bool} """ - self.llm_logger.info(f"LLM Engine request generation is paused: {self.is_paused}") + self.llm_logger.debug("LLM Engine request generation is paused: %s", self.is_paused) with self._pause_cond: return {"is_paused": self.is_paused} @@ -1418,12 +1418,12 @@ async def _wait_all_control_responses(self, request_id: str, timeout: int): raise Exception("Worker Update Weights Timeouted after 600s") response: ControlResponse = msg.payload if response.request_id != request_id: - self.llm_logger.info(f"ignore old control response from worker:{output_queue.name} {response}") + self.llm_logger.debug("ignore old control response from worker:%s %s", output_queue.name, response) continue if response.error_code != 200: - self.llm_logger.info(f"Call Worker Failed: {output_queue.name} {response.error_message}") + self.llm_logger.debug("Call Worker Failed: %s %s", output_queue.name, response.error_message) raise Exception(f"Call Worker error: {response.error_message}") - self.llm_logger.info(f"Call Worker Succeed: {output_queue.name} {response.result}") + self.llm_logger.debug("Call Worker Succeed: %s %s", output_queue.name, response.result) responses.append(response.result) return responses @@ -1579,7 +1579,7 @@ def _process_allocate_resource_requests(): if envs.ENABLE_V1_KVCACHE_SCHEDULER: if self.resource_manager.preallocate_resource_in_d(task): task.metrics.decode_preallocate_req_time = time.time() - self.llm_logger.info(f"Resource available, processing task {task.request_id}") + self.llm_logger.debug("Resource available, processing task %s", task.request_id) self.split_connector.send_cache_info_to_prefill([task]) self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}") processed_indices.append(idx) @@ -1652,7 +1652,7 @@ def _process_prefilled_requests(): if envs.FD_ENABLE_INTERNAL_ADAPTER: # first token sent by D instance self.scheduler.put_results([req_output]) self.resource_manager.add_prefilled_request(req_output) - self.llm_logger.info(f"D has successfully added prefilled request, {request_id}") + self.llm_logger.debug("D has successfully added prefilled request, %s", request_id) def decode_loop(): while self.running: @@ -1771,7 +1771,7 @@ def _exit_sub_services(self): self.resource_manager.cache_manager.shm_cache_task_flag_broadcast.clear() self.resource_manager.cache_manager.cache_ready_signal.clear() for p in self.cache_manager_processes: - self.llm_logger.info(f"Killing cache manager process {p.pid}") + self.llm_logger.info("Killing cache manager process %s", p.pid) try: pgid = os.getpgid(p.pid) os.killpg(pgid, signal.SIGTERM) @@ -1801,7 +1801,7 @@ def _exit_sub_services(self): # Clean up other services if hasattr(self, "dp_processed"): for p in self.dp_processed: - self.llm_logger.info(f"Waiting for worker {p.pid} to exit") + self.llm_logger.info("Waiting for worker %s to exit", p.pid) p.join() for p in self.dp_engine_worker_queue_server: p.cleanup() @@ -1970,14 +1970,14 @@ def _start_worker_service(self): think_start_id = self.data_processor.tokenizer.get_vocab().get("", -1) if think_start_id >= 0: - self.llm_logger.info(f"Get think_start_id {think_start_id} from vocab.") + self.llm_logger.debug("think_start_id=%s", think_start_id) else: - self.llm_logger.info("No token found in vocabulary, the model can not do reasoning.") + self.llm_logger.debug("No token found in vocabulary, the model can not do reasoning.") think_end_id = self.data_processor.tokenizer.get_vocab().get("", -1) if think_end_id >= 0: - self.llm_logger.info(f"Get think_end_id {think_end_id} from vocab.") + self.llm_logger.debug("think_end_id=%s", think_end_id) else: - self.llm_logger.info("No token found in vocabulary, the model can not do reasoning.") + self.llm_logger.debug("No token found in vocabulary, the model can not do reasoning.") image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1) line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1) if line_break_id < 0: @@ -1996,7 +1996,7 @@ def _start_worker_service(self): else: line_break_id = int(line_break_ids) if line_break_id >= 0: - self.llm_logger.info(f"Get line_break_id {line_break_id} from tokenizer.") + self.llm_logger.debug("line_break_id=%s", line_break_id) ports = ",".join(map(str, self.cfg.parallel_config.engine_worker_queue_port)) ips = None @@ -2083,7 +2083,7 @@ def _start_worker_service(self): if self.cfg.nnode > 1: pd_cmd = pd_cmd + f" --ips {ips} --nnodes {len(self.cfg.ips)}" pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log" - self.llm_logger.info(f"Launch worker service command: {pd_cmd}") + self.llm_logger.info("Launch worker service command: %s", pd_cmd) p = subprocess.Popen( pd_cmd, stdout=subprocess.PIPE, @@ -2154,7 +2154,7 @@ def launch_components(self): else: address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" - self.llm_logger.info(f"dp start queue service {address}") + self.llm_logger.debug("dp start queue service %s", address) self.dp_engine_worker_queue_server.append( EngineWorkerQueue( address=address, diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 790365482cb..7139d8337c2 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -192,12 +192,13 @@ def check_worker_initialize_status_func(res: dict): envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT = envs.FD_ZMQ_RECV_REQUEST_SERVER_PORTS.split(",")[0] if envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS is not None: envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0] - llm_logger.info( - f"envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT:{envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT},envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT:{envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT}" + llm_logger.debug( + "ZMQ ports: recv=%s, send=%s", + envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT, ) if api_server_pid is not None: - llm_logger.info(f"Start zmq server, api_server_pid: {api_server_pid}") + llm_logger.debug("Start zmq server, api_server_pid: %s", api_server_pid) self.engine.start_zmq_service(api_server_pid) # Worker launched @@ -206,7 +207,7 @@ def check_worker_initialize_status_func(res: dict): console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.") return False - console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.") + console_logger.info("Worker processes launched in %.1f seconds.", time.time() - start_time) # Print blocks number & max running requests to console if envs.ENABLE_V1_KVCACHE_SCHEDULER: @@ -269,7 +270,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): task.update(asdict(sampling_params)) request = Request.from_dict(task) request.metrics.scheduler_recv_req_time = time.time() - llm_logger.info(f"Receive request {request}") + llm_logger.debug("Receive request: %s", request) if sampling_params is not None: if sampling_params.temperature is not None and abs(sampling_params.temperature) < 1e-06: sampling_params.temperature = 1e-06 @@ -425,7 +426,7 @@ def _exit_sub_services(self): if hasattr(self.engine.resource_manager.cache_manager, "cache_ready_signal"): self.engine.resource_manager.cache_manager.cache_ready_signal.clear() for p in self.cache_manager_processes: - llm_logger.info(f"Killing cache manager process {p.pid}") + llm_logger.debug("Killing cache manager process %s", p.pid) try: pgid = os.getpgid(p.pid) os.killpg(pgid, signal.SIGTERM) @@ -451,7 +452,7 @@ def _exit_sub_services(self): if hasattr(self, "dp_processed"): for p in self.dp_processed: - console_logger.info(f"Waiting for worker {p.pid} to exit") + console_logger.debug("Waiting for worker %s to exit", p.pid) p.join() for p in self.dp_engine_worker_queue_server: p.cleanup() @@ -528,14 +529,14 @@ def _start_worker_service(self): think_start_id = self.data_processor.tokenizer.get_vocab().get("", -1) if think_start_id >= 0: - llm_logger.info(f"Get think_start_id {think_start_id} from vocab.") + llm_logger.debug("think_start_id=%s", think_start_id) else: - llm_logger.info("No token found in vocabulary, the model can not do reasoning.") + llm_logger.debug("No token found in vocabulary, the model can not do reasoning.") think_end_id = self.data_processor.tokenizer.get_vocab().get("", -1) if think_end_id >= 0: - llm_logger.info(f"Get think_end_id {think_end_id} from vocab.") + llm_logger.debug("think_end_id=%s", think_end_id) else: - llm_logger.info("No token found in vocabulary, the model can not do reasoning.") + llm_logger.debug("No token found in vocabulary, the model can not do reasoning.") image_patch_id = self.data_processor.tokenizer.get_vocab().get("<|IMAGE_PLACEHOLDER|>", -1) line_break_id = self.data_processor.tokenizer.get_vocab().get("\n", -1) if line_break_id < 0: @@ -554,7 +555,7 @@ def _start_worker_service(self): else: line_break_id = int(line_break_ids) if line_break_id >= 0: - llm_logger.info(f"Get line_break_id {line_break_id} from tokenizer.") + llm_logger.debug("line_break_id=%s", line_break_id) try: think_truncate_prompt_ids = self.data_processor.tokenizer.convert_tokens_to_ids( self.data_processor.tokenizer.tokenize(self.data_processor.tokenizer.think_truncate_prompt) @@ -563,7 +564,7 @@ def _start_worker_service(self): think_truncate_prompt_ids = self.data_processor.tokenizer.convert_tokens_to_ids( self.data_processor.tokenizer.tokenize(envs.FD_LIMIT_THINKING_CONTENT_TRUNCATE_STR) ) - llm_logger.info(f"Get think_truncate_prompt_ids {think_truncate_prompt_ids} from tokenizer.") + llm_logger.debug("think_truncate_prompt_ids=%s", think_truncate_prompt_ids) ports = ",".join(map(str, self.cfg.parallel_config.engine_worker_queue_port)) ips = None @@ -660,7 +661,7 @@ def _start_worker_service(self): if self.cfg.nnode > 1: pd_cmd = pd_cmd + f" --ips {ips} --nnodes {len(self.cfg.ips)}" pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log" - llm_logger.info(f"Launch worker service command: {pd_cmd}") + llm_logger.info("Launch worker service command: %s", pd_cmd) p = subprocess.Popen( pd_cmd, stdout=subprocess.PIPE, @@ -704,7 +705,7 @@ def generate(self, prompts, stream): Yields: dict: The generated response. """ - llm_logger.info(f"Starting generation for prompt: {prompts}") + llm_logger.debug("Starting generation for prompt: %s", prompts) try: req_id = self._format_and_add_data(prompts) except Exception as e: diff --git a/fastdeploy/entrypoints/api_server.py b/fastdeploy/entrypoints/api_server.py index 4f4d7f2250c..cf90a4c73e5 100644 --- a/fastdeploy/entrypoints/api_server.py +++ b/fastdeploy/entrypoints/api_server.py @@ -61,7 +61,7 @@ async def generate(request: dict): """ generate stream api """ - api_server_logger.info(f"Receive request: {request}") + api_server_logger.debug("Receive request: %s", request) stream = request.get("stream", 0) if not stream: @@ -99,8 +99,8 @@ def launch_api_server(args) -> None: if not is_port_available(args.host, args.port): raise Exception(f"The parameter `port`:{args.port} is already in use.") - api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}") - api_server_logger.info(f"args: {args.__dict__}") + api_server_logger.info("Launching FastDeploy API server on port %s", args.port) + api_server_logger.debug("Server args: %s", args.__dict__) if not init_app(args): api_server_logger.error("API Server launch failed.") diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index ca63e586cc6..87e8cfc5c91 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -89,7 +89,7 @@ parser = make_arg_parser(FlexibleArgumentParser()) args = parser.parse_args() -console_logger.info(f"Number of api-server workers: {args.workers}.") +console_logger.info("Number of API server workers: %d", args.workers) args.model = retrive_model_from_server(args.model, args.revision) chat_template = load_chat_template(args.chat_template, args.model) @@ -124,7 +124,7 @@ def load_engine(): if llm_engine is not None: return llm_engine - api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}, port: {args.port}") + api_server_logger.info("FastDeploy LLM API server starting (pid=%s, port=%s)", os.getpid(), args.port) engine_args = EngineArgs.from_cli_args(args) if envs.FD_ENABLE_ASYNC_LLM: engine = AsyncLLM.from_engine_args(engine_args, pid=args.port) @@ -152,11 +152,11 @@ def load_data_service(): global llm_engine if llm_engine is not None: return llm_engine - api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}, port: {args.port}") + api_server_logger.info("FastDeploy LLM API server starting (pid=%s, port=%s)", os.getpid(), args.port) engine_args = EngineArgs.from_cli_args(args) config = engine_args.create_engine_config() - api_server_logger.info(f"local_data_parallel_id: {config.parallel_config}") - api_server_logger.info(f"local_data_parallel_id: {config.parallel_config.local_data_parallel_id}") + api_server_logger.debug("parallel_config: %s", config.parallel_config) + api_server_logger.debug("local_data_parallel_id: %s", config.parallel_config.local_data_parallel_id) expert_service = ExpertService(config, config.parallel_config.local_data_parallel_id) if not expert_service.start(args.port, config.parallel_config.local_data_parallel_id): api_server_logger.error("Failed to initialize FastDeploy LLM expert service, service exit now!") @@ -188,7 +188,7 @@ async def lifespan(app: FastAPI): if args.tokenizer is None: args.tokenizer = args.model pid = args.port - api_server_logger.info(f"{pid}") + api_server_logger.debug("PID: %s", pid) if args.served_model_name is not None: served_model_names = args.served_model_name @@ -290,7 +290,7 @@ async def lifespan(app: FastAPI): from prometheus_client import multiprocess multiprocess.mark_process_dead(os.getpid()) - api_server_logger.info(f"Closing metrics client pid: {pid}") + api_server_logger.debug("Closing metrics client pid: %s", pid) except Exception as e: api_server_logger.warning(f"exit error: {e}, {str(traceback.format_exc())}") @@ -315,7 +315,7 @@ async def connection_manager(): await asyncio.wait_for(connection_semaphore.acquire(), timeout=0.001) yield except asyncio.TimeoutError: - api_server_logger.info(f"Reach max request concurrency, semaphore status: {connection_semaphore.status()}") + api_server_logger.debug("Reach max request concurrency, semaphore status: %s", connection_semaphore.status()) raise HTTPException( status_code=429, detail=f"Too many requests,current max concurrency is {args.max_concurrency}" ) @@ -520,7 +520,7 @@ async def create_completion(request: CompletionRequest, req: Request): """ Create a completion for the provided prompt and parameters. """ - api_server_logger.info(f"Completion Received request: {request.model_dump_json()}") + api_server_logger.debug("Completion received request: %s", request.model_dump_json()) if envs.TRACES_ENABLE: if req.headers: headers = dict(req.headers) @@ -658,8 +658,8 @@ def launch_api_server() -> None: if not is_port_available(args.host, args.port): raise Exception(f"The parameter `port`:{args.port} is already in use.") - api_server_logger.info(f"launch Fastdeploy api server... port: {args.port}") - api_server_logger.info(f"args: {args.__dict__}") + api_server_logger.info("Launching FastDeploy API server on port %s", args.port) + api_server_logger.debug("Server args: %s", args.__dict__) # fd_start_span("FD_START") # set control_socket_disable=True to avoid conflicts when running multiple instances @@ -851,11 +851,11 @@ def main(): api_server_logger.info("FastDeploy LLM engine initialized!\n") if args.metrics_port is not None and args.metrics_port != args.port: launch_metrics_server() - console_logger.info(f"Launching metrics service at http://{args.host}:{args.metrics_port}/metrics") + console_logger.info("Launching metrics service at http://%s:%s/metrics", args.host, args.metrics_port) else: - console_logger.info(f"Launching metrics service at http://{args.host}:{args.port}/metrics") - console_logger.info(f"Launching chat completion service at http://{args.host}:{args.port}/v1/chat/completions") - console_logger.info(f"Launching completion service at http://{args.host}:{args.port}/v1/completions") + console_logger.info("Launching metrics service at http://%s:%s/metrics", args.host, args.port) + console_logger.info("Launching chat completion service at http://%s:%s/v1/chat/completions", args.host, args.port) + console_logger.info("Launching completion service at http://%s:%s/v1/completions", args.host, args.port) launch_worker_monitor() launch_controller_server() diff --git a/fastdeploy/entrypoints/openai/multi_api_server.py b/fastdeploy/entrypoints/openai/multi_api_server.py index 32b3f75ae88..6568751d07d 100644 --- a/fastdeploy/entrypoints/openai/multi_api_server.py +++ b/fastdeploy/entrypoints/openai/multi_api_server.py @@ -50,7 +50,7 @@ def start_servers( else: controller_ports = [-1] * server_count - logger.info(f"Starting servers on ports: {ports} with args: {server_args} and metrics ports: {metrics_ports}") + logger.info("Starting servers on ports: %s with args: %s and metrics ports: %s", ports, server_args, metrics_ports) port_idx = {} for i in range(len(server_args)): if server_args[i] == "--engine-worker-queue-port": @@ -66,7 +66,7 @@ def start_servers( port = find_free_ports(num_ports=server_count) server_args += ["--engine-worker-queue-port", ",".join(map(str, port))] port_idx["engine_worker_queue_port"] = len(server_args) - 1 - logger.info(f"No --engine-worker-queue-port specified, using random ports: {port}") + logger.debug("No --engine-worker-queue-port specified, using random ports: %s", port) engine_worker_queue_port = server_args[port_idx["engine_worker_queue_port"]].split(",") if not check_param(engine_worker_queue_port, server_count): return @@ -75,7 +75,7 @@ def start_servers( port = find_free_ports(num_ports=server_count) server_args += ["--cache-queue-port", ",".join(map(str, port))] port_idx["cache_queue_port"] = len(server_args) - 1 - logger.info(f"No --cache-queue-port specified, using random ports: {port}") + logger.debug("No --cache-queue-port specified, using random ports: %s", port) cache_queue_port = server_args[port_idx["cache_queue_port"]].split(",") if not check_param(cache_queue_port, server_count): return @@ -84,7 +84,7 @@ def start_servers( port = find_free_ports(num_ports=server_count) server_args += ["--pd-comm-port", ",".join(map(str, port))] port_idx["pd_comm_port"] = len(server_args) - 1 - logger.info(f"No --pd-comm-port specified, using random ports: {port}") + logger.debug("No --pd-comm-port specified, using random ports: %s", port) pd_comm_port = server_args[port_idx["pd_comm_port"]].split(",") if not check_param(pd_comm_port, server_count): return @@ -93,12 +93,12 @@ def start_servers( port = find_free_ports(num_ports=device_count) server_args += ["--rdma-comm-ports", ",".join(map(str, port))] port_idx["rdma_comm_ports"] = len(server_args) - 1 - logger.info(f"No --rdma-comm-ports specified, using random ports: {port}") + logger.debug("No --rdma-comm-ports specified, using random ports: %s", port) rdma_comm_ports = server_args[port_idx["rdma_comm_ports"]].split(",") if not check_param(rdma_comm_ports, device_count): return - logger.info(f"Modified server_args: {server_args}") + logger.debug("Modified server_args: %s", server_args) processes = [] for i in range(server_count): port = int(ports[i]) @@ -125,16 +125,16 @@ def start_servers( # 启动子进程 proc = subprocess.Popen(cmd, env=env) processes.append(proc) - logger.info(f"Starting servers #{i+1} (PID: {proc.pid}) port: {port} | command: {' '.join(cmd)}") + logger.info("Starting server #%d (PID: %s) port: %s", i + 1, proc.pid, port) return processes def check_param(ports, num_servers): - logger.info(f"check param {ports}, {num_servers}") + logger.debug("check param %s, %s", ports, num_servers) assert len(ports) == num_servers, "Number of ports must match num-servers" for port in ports: - logger.info(f"check port {port}") + logger.debug("check port %s", port) if not is_port_available("0.0.0.0", int(port)): raise RuntimeError(f"Port {port} is not available.") return True @@ -149,7 +149,7 @@ def main(): parser.add_argument("--args", nargs=argparse.REMAINDER, help="remaining arguments are passed to api_server.py") args = parser.parse_args() - logger.info(f"Launching MultiAPIServer with command: {' '.join(sys.argv)}") + logger.info("Launching MultiAPIServer with command: %s", ' '.join(sys.argv)) device_count = 0 if current_platform.is_cuda(): diff --git a/fastdeploy/entrypoints/openai/serving_embedding.py b/fastdeploy/entrypoints/openai/serving_embedding.py index ec3223b3576..d619f01eb37 100644 --- a/fastdeploy/entrypoints/openai/serving_embedding.py +++ b/fastdeploy/entrypoints/openai/serving_embedding.py @@ -155,7 +155,7 @@ async def create_embedding(self, request: EmbeddingRequest): @override def _build_response(self, ctx: ServeContext, request_output: dict): """Generate final embedding response""" - api_server_logger.info(f"[{ctx.request_id}] Embedding RequestOutput received:{request_output}") + api_server_logger.debug("[%s] Embedding RequestOutput received: %s", ctx.request_id, request_output) base = PoolingRequestOutput.from_dict(request_output) embedding_res = EmbeddingRequestOutput.from_base(base) diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py index a357d3fc9df..17c9361c65c 100644 --- a/fastdeploy/entrypoints/openai/serving_engine.py +++ b/fastdeploy/entrypoints/openai/serving_engine.py @@ -76,7 +76,7 @@ def __init__(self, models, cfg, pid, ips, max_waiting_time): else: self.master_ip = "0.0.0.0" self.__semaphore = None - api_server_logger.info(f"master ip: {self.master_ip}") + api_server_logger.debug("master ip: %s", self.master_ip) def _get_semaphore(self) -> StatefulSemaphore: if self.__semaphore is None: @@ -116,7 +116,7 @@ async def _acquire_semaphore(self, request_id: str) -> bool: def _release_semaphore(self, request_id: str) -> None: """Release engine client semaphore""" self._get_semaphore().release() - api_server_logger.info(f"Release request:{request_id} status:{self._get_semaphore().status()}") + api_server_logger.debug("Release request:%s status:%s", request_id, self._get_semaphore().status()) def _create_error_response( self, @@ -192,7 +192,7 @@ async def _pipeline(self, ctx: ServeContext) -> Union[Any, ErrorResponse]: request_id = self._generate_request_id(request) ctx.request_id = request_id - api_server_logger.info(f"Initialize request {request_id}: {request}") + api_server_logger.debug("Initialize request %s: %s", request_id, request) # Step 2: Semaphore acquisition if not await self._acquire_semaphore(request_id): @@ -251,7 +251,7 @@ async def _preprocess(self, ctx: ServeContext): request_dicts = self._request_to_batch_dicts(ctx) ctx.preprocess_requests = request_dicts for request_dict in request_dicts: - api_server_logger.info(f"batch add request_id: {request_dict['request_id']}, request: {request_dict}") + api_server_logger.debug("batch add request_id: %s, request: %s", request_dict['request_id'], request_dict) await self.engine_client.format_and_add_data(request_dict) def _process_chat_template_kwargs(self, request_dict): @@ -317,7 +317,7 @@ async def _acquire_semaphore(self, request_id: str) -> bool: def _release_semaphore(self, request_id: str) -> None: """Release engine client semaphore""" self._get_semaphore().release() - api_server_logger.info(f"Release request:{request_id} status:{self._get_semaphore().status()}") + api_server_logger.debug("Release request:%s status:%s", request_id, self._get_semaphore().status()) @override def _check_master(self) -> bool: diff --git a/fastdeploy/scheduler/global_scheduler.py b/fastdeploy/scheduler/global_scheduler.py index 9f6d0644613..9d2a29f4354 100644 --- a/fastdeploy/scheduler/global_scheduler.py +++ b/fastdeploy/scheduler/global_scheduler.py @@ -125,7 +125,7 @@ def __init__( self.get_response_workers = threading.Thread(target=self._get_results_worker, daemon=True) self.get_response_workers.start() - scheduler_logger.info(f"Scheduler: name={self.name} redis_version={self.client.version}") + scheduler_logger.info("Scheduler: name=%s redis_version=%s", self.name, self.client.version) def _get_hash_slot(self, data: str) -> int: """ @@ -370,7 +370,7 @@ def _put_requests_worker(self, tasks: List[Task]) -> List[Task]: rem_amount=0, ttl=self.ttl, ) - scheduler_logger.info(f"Scheduler has enqueued some requests: {requests}") + scheduler_logger.debug("Scheduler has enqueued some requests: %s", requests) if duplicate: scheduler_logger.warning( @@ -491,15 +491,16 @@ def get_requests( ttl=self.ttl, ) serialized_requests += [(lucky_request_queue_name, element) for element in elements] - scheduler_logger.info( - f"Scheduler {self.name} has stolen some requests from another lucky one. " - f"(name={lucky} num={len(serialized_requests)})" + scheduler_logger.debug( + "Scheduler %s has stolen some requests from another lucky one. " + "(name=%s num=%d)", + self.name, lucky, len(serialized_requests), ) else: exist_num = self.client.exists(self._instance_name(lucky)) if exist_num == 0: if self.client.zrem(extend_scheduler_load_table_name, lucky): - scheduler_logger.info(f"Scheduler {lucky} has been removed") + scheduler_logger.debug("Scheduler %s has been removed", lucky) # blocked read if len(serialized_requests) == 0: @@ -517,8 +518,9 @@ def get_requests( self.client.zincrby(load_table_name, -1, scheduler_name, rem_amount=0, ttl=self.ttl) serialized_requests.append((request_queue_name, element[1])) if scheduler_name != self.name: - scheduler_logger.info( - f"Scheduler {self.name} has stolen a request from another scheduler. (name={scheduler_name})" + scheduler_logger.debug( + "Scheduler %s has stolen a request from another scheduler. (name=%s)", + self.name, scheduler_name, ) long_partial_requests = 0 From 5f2dd0296b5e18e9e3313b21e54f24266ad3c63b Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 6 Mar 2026 01:23:30 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E3=80=90Hackathon=209th=20No.88=E3=80=91Ph?= =?UTF-8?q?ase=20B=20completion:=20token=5Fprocessor,=20remaining=20config?= =?UTF-8?q?/engine/scheduler/serving=20fixes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - token_processor.py: 22 per-request/internal logs INFO→DEBUG, 2 error handlers INFO→ERROR, 2 request completion summaries f-string→%s (kept INFO) - config.py: parameter get/reset logs f-string→%s, sequence_parallel_moe→DEBUG - engine.py: cache task→DEBUG, dp queue service f-string→%s - common_engine.py: fix escaped quote syntax error in abort handler - serving_engine.py: both _acquire_semaphore calls f-string→DEBUG+%s - scheduler/: dp_scheduler typo fix (Recieve→Receive), all 3 schedulers enqueued/pulled/finished→DEBUG - spec_logger single head accept ratio→DEBUG Zero f-string .info() calls remain across all 12 target files. All 12 files pass Python AST syntax validation. --- fastdeploy/config.py | 10 +-- fastdeploy/engine/common_engine.py | 2 +- fastdeploy/engine/engine.py | 4 +- .../entrypoints/openai/serving_engine.py | 4 +- fastdeploy/output/token_processor.py | 73 ++++++++++--------- fastdeploy/scheduler/dp_scheduler.py | 8 +- fastdeploy/scheduler/global_scheduler.py | 12 +-- fastdeploy/scheduler/local_scheduler.py | 8 +- 8 files changed, 63 insertions(+), 58 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 15f049bde8b..8f8f8f4e2fa 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -378,9 +378,9 @@ def reset_config_value(key, value): if not hasattr(self, key.lower()): if os.getenv(key, None): value = eval(os.getenv(key)) - logger.info(f"Get parameter `{key}` = {value} from environment.") + logger.info("Get parameter `%s` = %s from environment.", key, value) else: - logger.info(f"Parameter `{key}` will use default value {value}.") + logger.info("Parameter `%s` will use default value %s.", key, value) setattr(self, key.lower(), value) reset_config_value("COMPRESSION_RATIO", 1.0) @@ -679,7 +679,7 @@ def __init__( and self.expert_parallel_size > 1 and self.tensor_parallel_size > 1 ) - logger.info(f"use_sequence_parallel_moe: {self.use_sequence_parallel_moe}") + logger.debug("use_sequence_parallel_moe: %s", self.use_sequence_parallel_moe) def set_communicate_group(self): # different tp group id @@ -1493,7 +1493,7 @@ def postprocess(self, num_total_tokens, number_of_tasks): block_num = (length + self.block_size - 1 + self.dec_token_num) // self.block_size self.total_block_num = block_num * number_of_tasks self.prefill_kvcache_block_num = self.total_block_num - logger.info(f"Doing profile, the total_block_num:{self.total_block_num}") + logger.info("Doing profile, the total_block_num: %d", self.total_block_num) def reset(self, num_gpu_blocks): """ @@ -2194,7 +2194,7 @@ def reset_value(cls, value_name, key): if hasattr(cls, key): value = getattr(cls, key) setattr(cls, value_name, value) - logger.info(f"Reset parameter {value_name} = {value} from configuration.") + logger.info("Reset parameter %s = %s from configuration.", value_name, value) reset_value(self.cache_config, "block_size", "infer_model_block_size") reset_value( diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index b5c012dacff..be084972dfb 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -1164,7 +1164,7 @@ def _insert_zmq_task_to_scheduler(self): status_value = data.get("status", None) if status_value is not None and status_value == RequestStatus.ABORT.value: req_id = data["request_id"] - self.llm_logger.info(\"Receive abort request, req_id: %s\", req_id) + self.llm_logger.info("Receive abort request, req_id: %s", req_id) self.resource_manager.abort_req_ids_set.add(req_id) if envs.ENABLE_V1_KVCACHE_SCHEDULER: if req_id in self.resource_manager.requests: diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 7139d8337c2..f43eaf33de9 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -340,7 +340,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): request.metrics.preprocess_end_time = time.time() request.metrics.scheduler_recv_req_time = time.time() self.engine.scheduler.put_requests([request]) - llm_logger.info(f"Cache task with request_id ({request.get('request_id')})") + llm_logger.debug("Cache task with request_id (%s)", request.get('request_id')) llm_logger.debug(f"cache task: {request}") def _worker_processes_ready(self): @@ -800,7 +800,7 @@ def launch_components(self): else: address = f"/dev/shm/fd_task_queue_{self.cfg.parallel_config.engine_worker_queue_port[i]}.sock" - llm_logger.info(f"dp start queue service {address}") + llm_logger.info("dp start queue service %s", address) self.dp_engine_worker_queue_server.append( EngineWorkerQueue( address=address, diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py index 17c9361c65c..aeabbb9aea4 100644 --- a/fastdeploy/entrypoints/openai/serving_engine.py +++ b/fastdeploy/entrypoints/openai/serving_engine.py @@ -101,7 +101,7 @@ def _check_supported_model(self, model_name: str) -> tuple[bool, str]: async def _acquire_semaphore(self, request_id: str) -> bool: """Acquire engine client semaphore with timeout""" try: - api_server_logger.info(f"Acquire request:{request_id} status:{self._get_semaphore().status()}") + api_server_logger.debug("Acquire request:%s status:%s", request_id, self._get_semaphore().status()) if self.max_waiting_time < 0: await self._get_semaphore().acquire() else: @@ -301,7 +301,7 @@ def _get_semaphore(self): async def _acquire_semaphore(self, request_id: str) -> bool: """Acquire engine client semaphore with timeout""" try: - api_server_logger.info(f"Acquire request:{request_id} status:{self._get_semaphore().status()}") + api_server_logger.debug("Acquire request:%s status:%s", request_id, self._get_semaphore().status()) if self.max_waiting_time < 0: await self._get_semaphore().acquire() else: diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 89b767f9ede..850e371192a 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -220,7 +220,7 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: for token_id in token_id_list: recovery_stop = token_id == RECOVERY_STOP_SIGNAL if recovery_stop: - llm_logger.info(f"recovery stop signal found at task {task_id}") + llm_logger.debug("recovery stop signal found at task %s", task_id) self.tokens_counter[task_id] += 1 if token_id != RECOVERY_STOP_SIGNAL: result.outputs.token_ids.append(token_id) @@ -249,15 +249,17 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: # Print combined log with all required information ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 llm_logger.info( - f"Request={task_id}, InputToken={task.prompt_token_ids_len}, " - f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, " - f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, " - f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " - f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}" + "Request=%s, InputToken=%s, CachedDetail=%s, OutputToken=%s, " + "TokenRatio=%.2f, TTFT=%.2f, E2E=%.2f, IsPrefill=%s, " + "RecoveryStop=%s, PreemptedCount=%s", + task_id, task.prompt_token_ids_len, cached_detail, + self.tokens_counter[task_id], token_ratio, ttft, + e2e_time, is_prefill, recovery_stop, + getattr(task.metrics, 'preempted_count', 0) ) main_process_metrics.request_token_ratio.observe(token_ratio) - llm_logger.info(f"{self.resource_manager.info()}") + llm_logger.debug("%s", self.resource_manager.info()) if self.cfg.speculative_config.method: self._compute_speculative_status() if not is_prefill: @@ -284,10 +286,10 @@ def _process_batch_output_use_zmq(self, receive_datas): if ( envs.ENABLE_V1_KVCACHE_SCHEDULER and token_ids[-1] == PREEMPTED_TOKEN_ID ) or not envs.ENABLE_V1_KVCACHE_SCHEDULER: - llm_logger.info(f"Aborted task {task_id} received negative token. Recycling.") + llm_logger.debug("Aborted task %s received negative token. Recycling.", task_id) self.resource_manager.abort_req_ids_set.remove(task_id) self._recycle_resources(task_id, i, task) - llm_logger.info(f"{task_id} received negative token. Recycle end.") + llm_logger.debug("%s received negative token. Recycle end.", task_id) abort_res = RequestOutput( request_id=task_id, finished=True, @@ -301,7 +303,7 @@ def _process_batch_output_use_zmq(self, receive_datas): task_id in self.resource_manager.to_be_rescheduled_request_id_set and token_ids[-1] == PREEMPTED_TOKEN_ID ): - llm_logger.info(f"sync preemption for request_id {task_id} done.") + llm_logger.debug("sync preemption for request_id %s done.", task_id) self.resource_manager.reschedule_preempt_task(task_id) continue @@ -477,7 +479,7 @@ def process_sampling_results(self): self.timestamp_for_alive_after_handle_batch = time.time() except Exception as e: - llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}") + llm_logger.error("while get input_data error: %s %s", e, traceback.format_exc()) def postprocess(self, batch_result: List[RequestOutput], mtype=3): """ @@ -523,14 +525,15 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False finished_task_ids = self.engine_worker_queue.get_finished_req() if len(finished_task_ids) > 0: for finished_task_id in finished_task_ids: - llm_logger.info(f"finished_task_id: {finished_task_id}") + llm_logger.debug("finished_task_id: %s", finished_task_id) self.prefill_result_status[finished_task_id[0]] = finished_task_id[1] if task_id in self.prefill_result_status: if self.prefill_result_status[task_id] != "finished": result.error_code = 400 result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}" - llm_logger.info( - f"wait for sending cache, request_id: {task_id}, cost seconds: {time.time()-start_time:.5f}" + llm_logger.debug( + "wait for sending cache, request_id: %s, cost seconds: %.5f", + task_id, time.time() - start_time ) result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) @@ -591,7 +594,7 @@ def _compute_speculative_status(self, result: RequestOutput): single_head_acceptance_rates.append( self.accept_token_num_per_head[i] / self.accept_token_num_per_head[i - 1] ) - spec_logger.info(f" Single head accept ratio: {single_head_acceptance_rates}") + spec_logger.debug("Single head accept ratio: %s", single_head_acceptance_rates) if self.number_of_output_tokens > 1000000: self.number_of_output_tokens = 0 @@ -754,13 +757,13 @@ def _process_batch_output(self): if self.cfg.speculative_config.method: self._record_speculative_decoding_accept_num_per_request(task_id, accept_num[i]) if accept_num[i] == PREEMPTED_TOKEN_ID: # in MTP, means preemption has happened in worker - llm_logger.info(f"sync preemption for request_id {task_id} done.") + llm_logger.debug("sync preemption for request_id %s done.", task_id) if envs.ENABLE_V1_KVCACHE_SCHEDULER: if task_id in self.resource_manager.abort_req_ids_set: - llm_logger.info(f"Aborted task {task_id} received negative token. Recycling.") + llm_logger.debug("Aborted task %s received negative token. Recycling.", task_id) self.resource_manager.abort_req_ids_set.remove(task_id) self._recycle_resources(task_id, i, task) - llm_logger.info(f"{task_id} received negative token. Recycle end.") + llm_logger.debug("%s received negative token. Recycle end.", task_id) abort_res = RequestOutput( request_id=task_id, finished=True, @@ -775,7 +778,7 @@ def _process_batch_output(self): if accept_num[i] == -3: recovery_stop = True if recovery_stop: - llm_logger.info(f"recovery stop signal found at task {task_id}") + llm_logger.debug("recovery stop signal found at task %s", task_id) token_ids = [RECOVERY_STOP_SIGNAL] elif self.use_logprobs: token_ids = tokens[i][:, 0].tolist()[: accept_num[i]] @@ -795,16 +798,16 @@ def _process_batch_output(self): token_ids = [token_id] recovery_stop = token_id == RECOVERY_STOP_SIGNAL if recovery_stop: - llm_logger.info(f"recovery stop signal found at task {task_id}") + llm_logger.debug("recovery stop signal found at task %s", task_id) if not recovery_stop and token_id < 0: if task_id in self.resource_manager.abort_req_ids_set: if ( envs.ENABLE_V1_KVCACHE_SCHEDULER and token_id == PREEMPTED_TOKEN_ID ) or not envs.ENABLE_V1_KVCACHE_SCHEDULER: - llm_logger.info(f"Aborted task {task_id} received negative token. Recycling.") + llm_logger.debug("Aborted task %s received negative token. Recycling.", task_id) self.resource_manager.abort_req_ids_set.remove(task_id) self._recycle_resources(task_id, i, task) - llm_logger.info(f"{task_id} received negative token. Recycle end.") + llm_logger.debug("%s received negative token. Recycle end.", task_id) abort_res = RequestOutput( request_id=task_id, finished=True, @@ -818,7 +821,7 @@ def _process_batch_output(self): task_id in self.resource_manager.to_be_rescheduled_request_id_set and token_id == PREEMPTED_TOKEN_ID ): - llm_logger.info(f"sync preemption for request_id {task_id} done.") + llm_logger.debug("sync preemption for request_id %s done.", task_id) self.resource_manager.reschedule_preempt_task(task_id) continue @@ -839,7 +842,7 @@ def _process_batch_output(self): task.metrics.record_recv_first_token() task.metrics.cal_cost_time() metrics = copy.copy(task.metrics) - llm_logger.info(f"task:{task.request_id} start recode first token") + llm_logger.debug("task:%s start record first token", task.request_id) self._record_first_token_metrics(task, current_time) tracing.trace_report_span( @@ -946,20 +949,22 @@ def _process_batch_output(self): ttft = task.metrics.first_token_time if task.metrics.first_token_time else 0 ttft_s = ttft + task.metrics.time_in_queue llm_logger.info( - f"Request={task_id}, InputToken={task.prompt_token_ids_len}, " - f"CachedDetail={cached_detail}, OutputToken={self.tokens_counter[task_id]}, " - f"TokenRatio={token_ratio:.2f}, TTFT={ttft:.2f}, TTFT_S={ttft_s:.2f}, " - f"E2E={e2e_time:.2f}, IsPrefill={is_prefill}, RecoveryStop={recovery_stop}, " - f"PreemptedCount={getattr(task.metrics, 'preempted_count', 0)}" + "Request=%s, InputToken=%s, CachedDetail=%s, OutputToken=%s, " + "TokenRatio=%.2f, TTFT=%.2f, TTFT_S=%.2f, E2E=%.2f, " + "IsPrefill=%s, RecoveryStop=%s, PreemptedCount=%s", + task_id, task.prompt_token_ids_len, cached_detail, + self.tokens_counter[task_id], token_ratio, ttft, ttft_s, + e2e_time, is_prefill, recovery_stop, + getattr(task.metrics, 'preempted_count', 0) ) main_process_metrics.request_token_ratio.observe(token_ratio) - llm_logger.info(f"{self.resource_manager.info()}") + llm_logger.debug("%s", self.resource_manager.info()) if self.cfg.speculative_config.method: self._compute_speculative_status(result) if not is_prefill: self._record_completion_metrics(task, current_time) - llm_logger.info(f"task {task_id} received eos token. Recycling.") + llm_logger.debug("task %s received eos token. Recycling.", task_id) if ( envs.ENABLE_V1_KVCACHE_SCHEDULER and self.cfg.cache_config.enable_prefix_caching @@ -969,7 +974,7 @@ def _process_batch_output(self): task ) # when enable prefix caching, cache kv cache for output tokens self._recycle_resources(task_id, i, task, result, is_prefill) - llm_logger.info(f"eos token {task_id} Recycle end.") + llm_logger.debug("eos token %s Recycle end.", task_id) break llm_logger.debug(f"get response from infer: {result}") @@ -1135,7 +1140,7 @@ def process_sampling_results(self): continue self._process_batch_output() except Exception as e: - llm_logger.info(f"while get input_data error: {e} {traceback.format_exc()!s}") + llm_logger.error("while get input_data error: %s %s", e, traceback.format_exc()) def stop(self): """ @@ -1143,5 +1148,5 @@ def stop(self): """ self._is_running = False self.worker.join() - llm_logger.info("warm up thread stop") + llm_logger.debug("warm up thread stop") del self.worker diff --git a/fastdeploy/scheduler/dp_scheduler.py b/fastdeploy/scheduler/dp_scheduler.py index f5b03eba30f..1f2a7c92b4f 100644 --- a/fastdeploy/scheduler/dp_scheduler.py +++ b/fastdeploy/scheduler/dp_scheduler.py @@ -58,7 +58,7 @@ def put_results(self, results: List[RequestOutput]): finished_responses = [response.request_id for response in responses if response.finished] if len(finished_responses) > 0: - self.scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") + self.scheduler_logger.debug("Scheduler has received some finished responses: %s", finished_responses) with self.mutex: self.batch_responses_per_step.append([response.raw for response in responses]) @@ -179,8 +179,8 @@ def get_requests( ) if len(requests) > 0: - self.scheduler_logger.info( - f"Scheduler has pulled some request: {[request.request_id for request in requests]}" + self.scheduler_logger.debug( + "Scheduler has pulled some request: %s", [request.request_id for request in requests] ) return requests @@ -228,7 +228,7 @@ def put_requests(self, requests: List[Dict]): def _put_requests_to_local(self): while True: request = self.request_queues.get() - self.scheduler_logger.info(f"Receive request from puller, request_id: {request.request_id}") + self.scheduler_logger.debug("Receive request from puller, request_id: %s", request.request_id) self._scheduler.put_requests([request]) def _get_response_from_local(self): diff --git a/fastdeploy/scheduler/global_scheduler.py b/fastdeploy/scheduler/global_scheduler.py index 9d2a29f4354..9e7df4aee0d 100644 --- a/fastdeploy/scheduler/global_scheduler.py +++ b/fastdeploy/scheduler/global_scheduler.py @@ -598,14 +598,14 @@ def get_requests( ttl=self.ttl, ) - scheduler_logger.info(f"Scheduler has put remaining request into the queue: {len(remaining_request)}") + scheduler_logger.debug("Scheduler has put remaining request into the queue: %d", len(remaining_request)) if len(requests) == 0: scheduler_logger.debug( f"Scheduler has put all just-pulled request into the queue: {len(remaining_request)}" ) if len(requests) > 0: - scheduler_logger.info(f"Scheduler has pulled some request: {[request.request_id for request in requests]}") + scheduler_logger.debug("Scheduler has pulled some request: %s", [request.request_id for request in requests]) return requests def _put_results_worker(self, tasks: List[Task]): @@ -666,7 +666,7 @@ def _put_results_worker(self, tasks: List[Task]): self.local_response_not_empty.notify_all() if len(finished_request_ids) > 0: - scheduler_logger.info(f"Scheduler has received some finished responses: {finished_request_ids}") + scheduler_logger.debug("Scheduler has received some finished responses: %s", finished_request_ids) for response_queue_name, responses in stolen_responses.items(): self.client.rpush(response_queue_name, *responses, ttl=self.ttl) @@ -795,7 +795,7 @@ def _get_results() -> Dict[str, List[ScheduledResponse]]: if finished: del self.local_responses[request_id] - scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}") + scheduler_logger.debug("Scheduler has pulled a finished response: %s", [request_id]) return results def reset(self): @@ -862,6 +862,6 @@ def update_config(self, load_shards_num: Optional[int], reallocate: Optional[boo scheduler_logger.info( "Scheduler has reload config, " - f"load_shards_num({old_load_shards_num} => {self.load_shards_num}) " - f"shard({old_shard} => {self.shard})" + "load_shards_num(%s => %s) shard(%s => %s)", + old_load_shards_num, self.load_shards_num, old_shard, self.shard, ) diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py index fc4a64686b5..651efc0cee0 100644 --- a/fastdeploy/scheduler/local_scheduler.py +++ b/fastdeploy/scheduler/local_scheduler.py @@ -191,7 +191,7 @@ def put_requests(self, requests: List[Request]) -> List[Tuple[str, Optional[str] self.ids += valid_ids self.requests_not_empty.notify_all() - scheduler_logger.info(f"Scheduler has enqueued some requests: {valid_ids}") + scheduler_logger.debug("Scheduler has enqueued some requests: %s", valid_ids) if len(duplicated_ids) > 0: scheduler_logger.warning(f"Scheduler has received some duplicated requests: {duplicated_ids}") @@ -300,7 +300,7 @@ def get_requests( scheduler_logger.debug(f"Scheduler has put all just-pulled request into the queue: {len(batch_ids)}") if len(requests) > 0: - scheduler_logger.info(f"Scheduler has pulled some request: {[request.request_id for request in requests]}") + scheduler_logger.debug("Scheduler has pulled some request: %s", [request.request_id for request in requests]) return requests @@ -316,7 +316,7 @@ def put_results(self, results: List[RequestOutput]): finished_responses = [response.request_id for response in responses if response.finished] if len(finished_responses) > 0: - scheduler_logger.info(f"Scheduler has received some finished responses: {finished_responses}") + scheduler_logger.debug("Scheduler has received some finished responses: %s", finished_responses) with self.mutex: self.batch_responses_per_step.append([response.raw for response in responses]) @@ -381,7 +381,7 @@ def _get_results(): if finished: self._recycle(request_id) - scheduler_logger.info(f"Scheduler has pulled a finished response: {[request_id]}") + scheduler_logger.debug("Scheduler has pulled a finished response: %s", [request_id]) if results: scheduler_logger.debug(f"get responses, {results}") From 66aaef8e3d1dbd63e6c1f560eab87d018c79d1fd Mon Sep 17 00:00:00 2001 From: cloudforge1 Date: Fri, 6 Mar 2026 02:32:47 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E3=80=90Hackathon=209th=20No.88=E3=80=91Ph?= =?UTF-8?q?ase=20B:=20model=5Fexecutor=20print=E2=86=92logger=20+=20per-re?= =?UTF-8?q?quest=20INFO=E2=86=92DEBUG=20+=20f-string=E2=86=92%s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - model_executor/: 14 print() calls → logger.debug/warning/info (6 files) - graph_optimization/utils.py: memory info → debug - ops/gpu/__init__.py: sm_version → debug, module error → warning - models/adapters.py: load failure → warning - layers/attention/mla_attention_backend.py: flash attn detect → info - ops/triton_ops/triton_utils.py: kernel cache/compile → debug - ops/triton_ops/triton_utils_v2.py: kernel cache/compile → debug - entrypoints/: per-request INFO → DEBUG with lazy %s (12 files) - engine_client.py: objgraph debug, max_tokens, control methods - serving_chat.py: create/release/stream per-request calls - serving_completion.py: init/preprocess/response per-request calls - v1/serving_chat.py, v1/serving_completion.py: stream/response - run_batch.py: startup/progress f-string → %s (keep INFO) - llm.py, chat_utils.py, serving_reward.py, utils.py, v1/serving_base.py - scheduler/splitwise_scheduler.py: per-request → DEBUG, startup → INFO %s Acceptance criteria met: - Core module print() = 0 (excl subprocess string, .print() methods) - Per-request INFO = 0 in serving paths (all downgraded to DEBUG) - All hot-path logger calls use %s lazy formatting --- fastdeploy/config.py | 9 ++++-- fastdeploy/engine/engine.py | 5 +-- fastdeploy/entrypoints/chat_utils.py | 2 +- fastdeploy/entrypoints/engine_client.py | 20 ++++++------ fastdeploy/entrypoints/llm.py | 2 +- .../entrypoints/openai/multi_api_server.py | 2 +- fastdeploy/entrypoints/openai/run_batch.py | 12 +++---- fastdeploy/entrypoints/openai/serving_chat.py | 18 +++++------ .../entrypoints/openai/serving_completion.py | 10 +++--- .../entrypoints/openai/serving_engine.py | 2 +- .../entrypoints/openai/serving_reward.py | 2 +- fastdeploy/entrypoints/openai/utils.py | 2 +- .../entrypoints/openai/v1/serving_base.py | 2 +- .../entrypoints/openai/v1/serving_chat.py | 4 +-- .../openai/v1/serving_completion.py | 6 ++-- .../graph_optimization/utils.py | 24 ++++++++------ .../layers/attention/mla_attention_backend.py | 10 ++++-- fastdeploy/model_executor/models/adapters.py | 5 ++- fastdeploy/model_executor/ops/gpu/__init__.py | 7 ++-- .../ops/triton_ops/triton_utils.py | 11 ++++--- .../ops/triton_ops/triton_utils_v2.py | 11 ++++--- fastdeploy/output/token_processor.py | 32 +++++++++++++------ fastdeploy/scheduler/global_scheduler.py | 22 ++++++++----- fastdeploy/scheduler/local_scheduler.py | 4 ++- fastdeploy/scheduler/splitwise_scheduler.py | 10 +++--- 25 files changed, 141 insertions(+), 93 deletions(-) diff --git a/fastdeploy/config.py b/fastdeploy/config.py index 8f8f8f4e2fa..5fe8588282e 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -702,8 +702,13 @@ def set_communicate_group(self): logger.info( "data_parallel_size: %d, tensor_parallel_size: %d, expert_parallel_size: %d, " "data_parallel_rank: %d, tensor_parallel_rank: %d, expert_parallel_rank: %d, tp_group: %s", - self.data_parallel_size, self.tensor_parallel_size, self.expert_parallel_size, - self.data_parallel_rank, self.tensor_parallel_rank, self.expert_parallel_rank, self.tp_group, + self.data_parallel_size, + self.tensor_parallel_size, + self.expert_parallel_size, + self.data_parallel_rank, + self.tensor_parallel_rank, + self.expert_parallel_rank, + self.tp_group, ) def print(self): diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index f43eaf33de9..9213047620f 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -194,7 +194,8 @@ def check_worker_initialize_status_func(res: dict): envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT = envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORTS.split(",")[0] llm_logger.debug( "ZMQ ports: recv=%s, send=%s", - envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT, + envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, + envs.FD_ZMQ_SEND_RESPONSE_SERVER_PORT, ) if api_server_pid is not None: @@ -340,7 +341,7 @@ def add_requests(self, task, sampling_params=None, **kwargs): request.metrics.preprocess_end_time = time.time() request.metrics.scheduler_recv_req_time = time.time() self.engine.scheduler.put_requests([request]) - llm_logger.debug("Cache task with request_id (%s)", request.get('request_id')) + llm_logger.debug("Cache task with request_id (%s)", request.get("request_id")) llm_logger.debug(f"cache task: {request}") def _worker_processes_ready(self): diff --git a/fastdeploy/entrypoints/chat_utils.py b/fastdeploy/entrypoints/chat_utils.py index 08abc8ed391..a486bcb8e00 100644 --- a/fastdeploy/entrypoints/chat_utils.py +++ b/fastdeploy/entrypoints/chat_utils.py @@ -122,7 +122,7 @@ def http_get_with_retry(self, url, max_retries=3, retry_delay=1, backoff_factor= if retry_cnt >= max_retries: api_server_logger.error(f"HTTP GET failed: {e}. Max retries reached") raise - api_server_logger.info(f"HTTP GET failed: {e}. Start retry {retry_cnt}") + api_server_logger.info("HTTP GET failed: %s. Start retry %s", e, retry_cnt) time.sleep(delay) delay *= backoff_factor diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 53c71351820..40ba6da9b62 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -312,7 +312,7 @@ async def add_requests(self, task): ) else: request_id = task.get("request_id", "unknown") - obj_logger.info(f"\n{'='*60} OBJGRAPH DEBUG [request_id={request_id}] {'='*60}") + obj_logger.debug("\n%s OBJGRAPH DEBUG [request_id=%s] %s", "=" * 60, request_id, "=" * 60) # 打印内存占用 if not _has_psutil: obj_logger.warning( @@ -321,18 +321,18 @@ async def add_requests(self, task): else: process = psutil.Process() rss_memory = process.memory_info().rss / 1024**3 - obj_logger.info(f"Process Memory (RSS): {rss_memory:.2f} GB") + obj_logger.debug("Process Memory (RSS): %.2f GB", rss_memory) obj_logger.info("Object growth statistics:") growth_data = objgraph.growth(limit=20) for item in growth_data: if len(item) == 3: obj_type, current_count, growth = item - obj_logger.info(f" {obj_type:30s} {current_count:8d} +{growth}") + obj_logger.debug(" %-30s %8d +%s", obj_type, current_count, growth) elif len(item) == 2: obj_type, count = item - obj_logger.info(f" {obj_type:30s} +{count}") + obj_logger.debug(" %-30s +%s", obj_type, count) else: - obj_logger.info(f" {item}") + obj_logger.debug(" %s", item) task["metrics"]["preprocess_start_time"] = time.time() request_id = task.get("request_id").split("_")[0] @@ -359,7 +359,7 @@ async def add_requests(self, task): min_tokens = task.get("min_tokens", 1) if "messages" in task: task["messages"] = None - api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}") + api_server_logger.debug("task['max_tokens']:%s", task["max_tokens"]) main_process_metrics.request_params_max_tokens.observe(task["max_tokens"]) main_process_metrics.prompt_tokens_total.inc(input_ids_len) main_process_metrics.request_prompt_tokens.observe(input_ids_len) @@ -592,7 +592,7 @@ def check_health(self, time_interval_threashold=30): return True, "" async def run_control_method(self, request: ControlRequest): - api_server_logger.info(f"Start Run Control Method: {request}") + api_server_logger.debug("Start Run Control Method: %s", request) self.zmq_client.send_json(request.to_dict()) request_id = request.request_id dealer, response_queue = await self.connection_manager.get_connection(request_id) @@ -601,7 +601,7 @@ async def run_control_method(self, request: ControlRequest): # todo: support user specified timeout. default 600s is enough for most control cases response = await asyncio.wait_for(response_queue.get(), timeout=600) response = ControlResponse.from_dict(response[0]) - api_server_logger.info(f"End Run Control Method: {response}") + api_server_logger.debug("End Run Control Method: %s", response) return response except asyncio.TimeoutError: error_response = ControlResponse(request_id, 500, "Timeout waiting for control method response") @@ -827,7 +827,7 @@ async def rearrange_experts(self, request_dict: dict): return content, status_code action = request_dict.get("action", "") - api_server_logger.info(f"redundant_expert: rearrange_experts recv request, action {action}") + api_server_logger.debug("redundant_expert: rearrange_experts recv request, action %s", action) if action == "": # action: start rearrange experts # params: {'user': 'xxx', 'passwd': 'xxx', 'ips': ['10.54.99.77:8000', '10.54.99.77:8300']} @@ -995,7 +995,7 @@ async def check_redundant(self, request_dict: dict): async def abort(self, request_id, n=1) -> None: if envs.FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE: - api_server_logger.info(f"abort request_id:{request_id}") + api_server_logger.info("abort request_id:%s", request_id) if n <= 0: api_server_logger.warning("Abort function called with non-positive n: %d. No requests aborted.", n) return diff --git a/fastdeploy/entrypoints/llm.py b/fastdeploy/entrypoints/llm.py index 101c8fbb101..7fa61fd47bc 100644 --- a/fastdeploy/entrypoints/llm.py +++ b/fastdeploy/entrypoints/llm.py @@ -408,7 +408,7 @@ def _build_sample_logprobs(self, logprobs_lists: LogprobsLists, topk_logprobs: i list[dict[int, Logprob]]: One dict per request, mapping token ID to Logprob. """ try: - llm_logger.info(f"filter logprobs, topk_logprobs: {topk_logprobs}") + llm_logger.debug("filter logprobs, topk_logprobs: %s", topk_logprobs) if not logprobs_lists.logprob_token_ids: llm_logger.warning("Empty logprob_token_ids in LogprobsLists") return None diff --git a/fastdeploy/entrypoints/openai/multi_api_server.py b/fastdeploy/entrypoints/openai/multi_api_server.py index 6568751d07d..b55480219e8 100644 --- a/fastdeploy/entrypoints/openai/multi_api_server.py +++ b/fastdeploy/entrypoints/openai/multi_api_server.py @@ -149,7 +149,7 @@ def main(): parser.add_argument("--args", nargs=argparse.REMAINDER, help="remaining arguments are passed to api_server.py") args = parser.parse_args() - logger.info("Launching MultiAPIServer with command: %s", ' '.join(sys.argv)) + logger.info("Launching MultiAPIServer with command: %s", " ".join(sys.argv)) device_count = 0 if current_platform.is_cuda(): diff --git a/fastdeploy/entrypoints/openai/run_batch.py b/fastdeploy/entrypoints/openai/run_batch.py index 67766ff67e8..2daeb8bb659 100644 --- a/fastdeploy/entrypoints/openai/run_batch.py +++ b/fastdeploy/entrypoints/openai/run_batch.py @@ -114,7 +114,7 @@ def init_engine(args: argparse.Namespace): if llm_engine is not None: return llm_engine - api_server_logger.info(f"FastDeploy LLM API server starting... {os.getpid()}") + api_server_logger.info("FastDeploy LLM API server starting... %s", os.getpid()) engine_args = EngineArgs.from_cli_args(args) engine = LLMEngine.from_engine_args(engine_args) if not engine.start(api_server_pid=os.getpid()): @@ -144,7 +144,7 @@ def completed(self): if self._total > 0: log_interval = min(100, max(self._total // 10, 1)) if self._completed - self._last_log_count >= log_interval: - console_logger.info(f"Progress: {self._completed}/{self._total} requests completed") + console_logger.info("Progress: %s/%s requests completed", self._completed, self._total) self._last_log_count = self._completed def pbar(self) -> tqdm: @@ -398,7 +398,7 @@ async def setup_engine_and_handlers(args: Namespace) -> Tuple[EngineClient, Open args.tokenizer = args.model pid = determine_process_id() - console_logger.info(f"Process ID: {pid}") + console_logger.info("Process ID: %s", pid) model_paths = create_model_paths(args) chat_template = load_chat_template(args.chat_template, args.model) @@ -429,7 +429,7 @@ async def run_batch( max_concurrency = (concurrency + workers - 1) // workers semaphore = asyncio.Semaphore(max_concurrency) - console_logger.info(f"concurrency: {concurrency}, workers: {workers}, max_concurrency: {max_concurrency}") + console_logger.info("concurrency: %s, workers: %s, max_concurrency: %s", concurrency, workers, max_concurrency) tracker = BatchProgressTracker() console_logger.info("Reading batch from %s...", args.input_file) @@ -474,7 +474,7 @@ async def run_batch( success_count = sum(1 for r in responses if r.error is None) error_count = len(responses) - success_count - console_logger.info(f"Batch processing completed: {success_count} success, {error_count} errors") + console_logger.info("Batch processing completed: %s success, %s errors", success_count, error_count) await write_file(args.output_file, responses, args.output_tmp_dir) console_logger.info("Results written to output file") @@ -485,7 +485,7 @@ async def main(args: argparse.Namespace): try: if args.workers is None: args.workers = max(min(int(args.max_num_seqs // 32), 8), 1) - console_logger.info(f"Workers: {args.workers}") + console_logger.info("Workers: %s", args.workers) args.model = retrive_model_from_server(args.model, args.revision) if args.tool_parser_plugin: diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index f56d572615c..f58f2389d35 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -98,7 +98,7 @@ def __init__( else: self.master_ip = "0.0.0.0" self.is_master_ip = True - api_server_logger.info(f"master ip: {self.master_ip}") + api_server_logger.info("master ip: %s", self.master_ip) def _check_master(self): return self.engine_client.is_master or self.is_master_ip @@ -129,7 +129,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest): await self.engine_client.semaphore.acquire() else: await asyncio.wait_for(self.engine_client.semaphore.acquire(), timeout=self.max_waiting_time) - api_server_logger.info(f"current {self.engine_client.semaphore.status()}") + api_server_logger.debug("current %s", self.engine_client.semaphore.status()) if request.request_id is not None: request_id = request.request_id @@ -141,7 +141,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest): request_id = f"chatcmpl-{uuid.uuid4()}" tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy") del request.trace_context - api_server_logger.info(f"create chat completion request: {request_id}") + api_server_logger.debug("create chat completion request: %s", request_id) prompt_tokens = None max_tokens = None try: @@ -252,7 +252,7 @@ async def chat_completion_stream_generator( choices=[], model=model_name, ) - api_server_logger.info(f"create chat completion request: {request_id}") + api_server_logger.debug("create chat completion request: %s", request_id) try: dealer, response_queue = await self.engine_client.connection_manager.get_connection( @@ -370,7 +370,7 @@ async def chat_completion_stream_generator( completion_tokens_details=CompletionTokenUsageInfo(reasoning_tokens=0), ) yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n" - api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}") + api_server_logger.debug("Chat Streaming response send_idx 0: %s", chunk.model_dump_json()) first_iteration = False output = res["outputs"] @@ -489,7 +489,7 @@ async def chat_completion_stream_generator( chunk.choices = choices yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" if res["finished"]: - api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") + api_server_logger.debug("Chat Streaming response last send: %s", chunk.model_dump_json()) choices = [] if include_usage: @@ -528,7 +528,7 @@ async def chat_completion_stream_generator( tracing.trace_req_finish(request_id) await self.engine_client.connection_manager.cleanup_request(request_id) self.engine_client.semaphore.release() - api_server_logger.info(f"release {request_id} {self.engine_client.semaphore.status()}") + api_server_logger.debug("release %s %s", request_id, self.engine_client.semaphore.status()) yield "data: [DONE]\n\n" async def chat_completion_full_generator( @@ -695,7 +695,7 @@ async def chat_completion_full_generator( tracing.trace_req_finish(request_id) await self.engine_client.connection_manager.cleanup_request(request_id) self.engine_client.semaphore.release() - api_server_logger.info(f"release {self.engine_client.semaphore.status()}") + api_server_logger.debug("release %s", self.engine_client.semaphore.status()) num_prompt_tokens = len(prompt_token_ids) num_generated_tokens = sum(previous_num_tokens) @@ -722,7 +722,7 @@ async def chat_completion_full_generator( choices=choices, usage=usage, ) - api_server_logger.info(f"Chat response: {res.model_dump_json()}") + api_server_logger.debug("Chat response: %s", res.model_dump_json()) return res async def _create_chat_completion_choice( diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index 3dafb270905..6a6b9ab8465 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -77,7 +77,7 @@ def __init__(self, engine_client, models, pid, ips, max_waiting_time): self.master_ip = "0.0.0.0" self.is_master_ip = True self._is_process_response_dict_async = None - api_server_logger.info(f"master ip: {self.master_ip}") + api_server_logger.info("master ip: %s", self.master_ip) def _check_master(self): return self.engine_client.is_master or self.is_master_ip @@ -110,7 +110,7 @@ async def create_completion(self, request: CompletionRequest): request_id = f"cmpl-{request.user}-{uuid.uuid4()}" else: request_id = f"cmpl-{uuid.uuid4()}" - api_server_logger.info(f"Initialize request {request_id}: {request}") + api_server_logger.debug("Initialize request %s: %s", request_id, request) tracing.trace_req_start(rid=request_id, trace_content=request.trace_context, role="FastDeploy") del request.trace_context request_prompt_ids = None @@ -155,7 +155,7 @@ async def create_completion(self, request: CompletionRequest): request_prompts = request_prompt_ids num_choices = len(request_prompts) * (1 if request.n is None else request.n) - api_server_logger.info(f"Start preprocessing request: req_id={request_id}), num_choices={num_choices}") + api_server_logger.debug("Start preprocessing request: req_id=%s, num_choices=%s", request_id, num_choices) prompt_batched_token_ids = [] prompt_tokens_list = [] max_tokens_list = [] @@ -370,7 +370,7 @@ async def completion_full_generator( prompt_tokens_list=prompt_tokens_list, max_tokens_list=max_tokens_list, ) - api_server_logger.info(f"Completion response: {res.model_dump_json()}") + api_server_logger.debug("Completion response: %s", res.model_dump_json()) return res except Exception as e: api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) @@ -642,7 +642,7 @@ async def completion_stream_generator( metrics=res["metrics"] if request.collect_metrics else None, ) yield f"data: {usage_chunk.model_dump_json(exclude_unset=True)}\n\n" - api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}") + api_server_logger.debug("Completion Streaming response last send: %s", chunk.model_dump_json()) except asyncio.CancelledError as e: await self.engine_client.abort(f"{request_id}_0", num_choices) diff --git a/fastdeploy/entrypoints/openai/serving_engine.py b/fastdeploy/entrypoints/openai/serving_engine.py index aeabbb9aea4..6371ad8c6a3 100644 --- a/fastdeploy/entrypoints/openai/serving_engine.py +++ b/fastdeploy/entrypoints/openai/serving_engine.py @@ -251,7 +251,7 @@ async def _preprocess(self, ctx: ServeContext): request_dicts = self._request_to_batch_dicts(ctx) ctx.preprocess_requests = request_dicts for request_dict in request_dicts: - api_server_logger.debug("batch add request_id: %s, request: %s", request_dict['request_id'], request_dict) + api_server_logger.debug("batch add request_id: %s, request: %s", request_dict["request_id"], request_dict) await self.engine_client.format_and_add_data(request_dict) def _process_chat_template_kwargs(self, request_dict): diff --git a/fastdeploy/entrypoints/openai/serving_reward.py b/fastdeploy/entrypoints/openai/serving_reward.py index cbde62deea5..3220d550b8c 100644 --- a/fastdeploy/entrypoints/openai/serving_reward.py +++ b/fastdeploy/entrypoints/openai/serving_reward.py @@ -107,7 +107,7 @@ async def create_reward(self, request: ChatRewardRequest): @override def _build_response(self, ctx: ServeContext, request_output: dict): """Generate final reward response""" - api_server_logger.info(f"[{ctx.request_id}] Reward RequestOutput received:{request_output}") + api_server_logger.debug("[%s] Reward RequestOutput received:%s", ctx.request_id, request_output) base = PoolingRequestOutput.from_dict(request_output) reward_res = RewardRequestOutput.from_base(base) diff --git a/fastdeploy/entrypoints/openai/utils.py b/fastdeploy/entrypoints/openai/utils.py index 80701ac6d5e..ba8fa9508f3 100644 --- a/fastdeploy/entrypoints/openai/utils.py +++ b/fastdeploy/entrypoints/openai/utils.py @@ -97,7 +97,7 @@ async def initialize(self): self.running = True for index in range(self.max_connections): await self._add_connection(index) - api_server_logger.info(f"Started {self.max_connections} connections, pid {self.pid}") + api_server_logger.info("Started %s connections, pid %s", self.max_connections, self.pid) async def _add_connection(self, index): """create a new connection and start listening task""" diff --git a/fastdeploy/entrypoints/openai/v1/serving_base.py b/fastdeploy/entrypoints/openai/v1/serving_base.py index ba9ba9dfc75..52ee9a048e0 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_base.py +++ b/fastdeploy/entrypoints/openai/v1/serving_base.py @@ -76,7 +76,7 @@ def __init__( self.master_ip = "0.0.0.0" self.is_master_ip = True self.eoi_token_id = 101032 - api_server_logger.info(f"master ip: {self.master_ip}") + api_server_logger.info("master ip: %s", self.master_ip) @override def _check_master(self) -> bool: diff --git a/fastdeploy/entrypoints/openai/v1/serving_chat.py b/fastdeploy/entrypoints/openai/v1/serving_chat.py index ed622c2d4dd..ceab3dd6fc1 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_chat.py +++ b/fastdeploy/entrypoints/openai/v1/serving_chat.py @@ -302,7 +302,7 @@ async def _build_stream_response( max_tokens = request.max_completion_tokens or request.max_tokens choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index] choice.finish_reason = self._calc_finish_reason(request_output, max_tokens, choice_completion_tokens) - api_server_logger.info(f"Chat Streaming response last send: {chunk.model_dump_json()}") + api_server_logger.debug("Chat Streaming response last send: %s", chunk.model_dump_json()) yield f"data: {chunk.model_dump_json(exclude_unset=True)}\n\n" if request_output.finished and response_ctx.remain_choices == 0: @@ -339,7 +339,7 @@ async def _build_full_response( res = ChatCompletionResponse( id=ctx.request_id, model=request.model, choices=choices, created=ctx.created_time, usage=response_ctx.usage ) - api_server_logger.info(f"Chat response: {res.model_dump_json()}") + api_server_logger.debug("Chat response: %s", res.model_dump_json()) return res async def _create_chat_completion_choice( diff --git a/fastdeploy/entrypoints/openai/v1/serving_completion.py b/fastdeploy/entrypoints/openai/v1/serving_completion.py index c8e82eed8a8..c101a66b5f5 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_completion.py +++ b/fastdeploy/entrypoints/openai/v1/serving_completion.py @@ -271,9 +271,9 @@ async def _build_stream_response( choice.finish_reason = self._calc_finish_reason( request_output, request.max_tokens, choice_completion_tokens ) - api_server_logger.info(f"Completion Streaming response last send: {chunk.model_dump_json()}") + api_server_logger.debug("Completion Streaming response last send: %s", chunk.model_dump_json()) if send_idx == 0 and not request.return_token_ids: - api_server_logger.info(f"Completion Streaming response send_idx 0: {chunk.model_dump_json()}") + api_server_logger.debug("Completion Streaming response send_idx 0: %s", chunk.model_dump_json()) yield f"data: {chunk.model_dump_json()}\n\n" if request_output.finished and response_ctx.remain_choices == 0: if include_usage: @@ -321,7 +321,7 @@ async def _build_full_response( choices=choices, usage=response_ctx.usage, ) - api_server_logger.info(f"Completion response: {res.model_dump_json()}") + api_server_logger.debug("Completion response: %s", res.model_dump_json()) return res except Exception as e: api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) diff --git a/fastdeploy/model_executor/graph_optimization/utils.py b/fastdeploy/model_executor/graph_optimization/utils.py index 2b5241e5aed..1807b6cceeb 100644 --- a/fastdeploy/model_executor/graph_optimization/utils.py +++ b/fastdeploy/model_executor/graph_optimization/utils.py @@ -15,9 +15,12 @@ """ import contextlib +import logging from dataclasses import dataclass import paddle + +logger = logging.getLogger(__name__) import pynvml from fastdeploy.platforms import current_platform @@ -66,15 +69,18 @@ def _print_memory_info( debug_title: str = "", ): """Print debug info""" - print( - f"\n{debug_title}:", - f"\n\tDevice Total memory: {self.gpu_memory_info.total}", - f"\n\tDevice Used memory: {self.gpu_memory_info.used}", - f"\n\tDevice Free memory: {self.gpu_memory_info.free}", - f"\n\tPaddle max memory Reserved: {self.paddle_memory_info.max_reserved}", - f"\n\tPaddle max memory Allocated: {self.paddle_memory_info.max_allocated}", - f"\n\tPaddle memory Reserved: {self.paddle_memory_info.current_reserved}", - f"\n\tPaddle memory Allocated: {self.paddle_memory_info.current_reserved}", + logger.debug( + "%s:\n\tDevice Total memory: %s\n\tDevice Used memory: %s\n\tDevice Free memory: %s" + "\n\tPaddle max memory Reserved: %s\n\tPaddle max memory Allocated: %s" + "\n\tPaddle memory Reserved: %s\n\tPaddle memory Allocated: %s", + debug_title, + self.gpu_memory_info.total, + self.gpu_memory_info.used, + self.gpu_memory_info.free, + self.paddle_memory_info.max_reserved, + self.paddle_memory_info.max_allocated, + self.paddle_memory_info.current_reserved, + self.paddle_memory_info.current_reserved, ) def get_gpu_memory_info(self): diff --git a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py index 08ed2d8c061..a3d7fc465cc 100644 --- a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py @@ -16,8 +16,12 @@ from __future__ import annotations +import logging + import paddle +logger = logging.getLogger(__name__) + paddle.enable_compat(scope={"flash_mla"}) # Enable torch proxy before importing flash_mla import math import os @@ -294,13 +298,13 @@ def __init__( is_paddle_supported = any(num >= 90 for num in paddle.version.cuda_archs()) if is_current_sm_supported and is_paddle_supported: self.flash_attn_func = flash_attention_v3_varlen - print("The current platform supports Flash Attention V3.") + logger.info("The current platform supports Flash Attention V3.") self.flash_attn_kwargs = {"softmax_scale": self.attn_softmax_scale} else: self.flash_attn_func = flash_attn_unpadded self.flash_attn_kwargs = {"scale": self.attn_softmax_scale, "training": False} - print( - "The current platform does not support Flash Attention V3, so Flash Attention V2 will be used instead." + logger.info( + "The current platform does not support Flash Attention V3, using Flash Attention V2 instead." ) def init_attention_metadata(self, forward_meta: ForwardMeta): diff --git a/fastdeploy/model_executor/models/adapters.py b/fastdeploy/model_executor/models/adapters.py index 306bbcd5169..b090f37fcd2 100644 --- a/fastdeploy/model_executor/models/adapters.py +++ b/fastdeploy/model_executor/models/adapters.py @@ -14,10 +14,13 @@ # limitations under the License. """ +import logging from collections.abc import Iterable from typing import TypeVar import paddle + +logger = logging.getLogger(__name__) import paddle.nn as nn from fastdeploy.config import ModelConfig @@ -72,7 +75,7 @@ def _load_dense_weights(linear: nn.Linear, folder: str, model_config: "ModelConf bias_loader(linear.bias, state_dict[bias_key].astype(paddle.float32)) return True except Exception as e: - print(f"Failed to load :{e}") + logger.warning("Failed to load adapter weight: %s", e) return False return False diff --git a/fastdeploy/model_executor/ops/gpu/__init__.py b/fastdeploy/model_executor/ops/gpu/__init__.py index a4faadb17a1..5b324dcae3b 100644 --- a/fastdeploy/model_executor/ops/gpu/__init__.py +++ b/fastdeploy/model_executor/ops/gpu/__init__.py @@ -13,10 +13,13 @@ # limitations under the License. """fastdeploy gpu ops""" +import logging import sys from fastdeploy.import_ops import import_custom_ops +logger = logging.getLogger(__name__) + PACKAGE = "fastdeploy.model_executor.ops.gpu" @@ -29,7 +32,7 @@ def decide_module(): # to support all hardware platforms (NVIDIA, ILUVATAR, HPU, etc.) prop = paddle.device.get_device_properties() sm_version = prop.major * 10 + prop.minor - print(f"current sm_version={sm_version}") + logger.debug("current sm_version=%s", sm_version) curdir = os.path.dirname(os.path.abspath(__file__)) sm_version_path = os.path.join(curdir, f"fastdeploy_ops_{sm_version}") @@ -42,7 +45,7 @@ def decide_module(): try: module_path = decide_module() except Exception as e: - print(f"decide_module error, load custom_ops from .fastdeploy_ops: {e}") + logger.warning("decide_module error, load custom_ops from .fastdeploy_ops: %s", e) pass import_custom_ops(PACKAGE, module_path, globals()) diff --git a/fastdeploy/model_executor/ops/triton_ops/triton_utils.py b/fastdeploy/model_executor/ops/triton_ops/triton_utils.py index f92e8d9c94f..524395a84b8 100644 --- a/fastdeploy/model_executor/ops/triton_ops/triton_utils.py +++ b/fastdeploy/model_executor/ops/triton_ops/triton_utils.py @@ -15,6 +15,7 @@ """ import inspect +import logging import os import re import sys @@ -25,6 +26,8 @@ from fastdeploy import envs +logger = logging.getLogger(__name__) + compile_file = triton.__path__[0] + "/tools/compile.py" link_file = triton.__path__[0] + "/tools/link.py" python_path = sys.executable @@ -660,7 +663,7 @@ def decorator(*args, **kwargs): generated_dir = envs.FD_TRITON_KERNEL_CACHE_DIR if generated_dir is None: generated_dir = f"/tmp/triton_cache/rank{tp_rank}" - print("the kernel cache dir is:", generated_dir) + logger.debug("the kernel cache dir is: %s", generated_dir) assert generated_dir is not None, ( "TRITON_KERNEL_CACHE_DIR is None, please set it such as " "export TRITON_KERNEL_CACHE_DIR=/tmp/triton_cache " @@ -702,7 +705,7 @@ def decorator(*args, **kwargs): so_path = find_so_path(generated_dir, python_package_name) if so_path is None: - print("== we do not find so_path, we need to compile it") + logger.debug("== we do not find so_path, we need to compile it") with open(paddle_custom_op_file_path, "w") as f: f.write( SubstituteTemplate( @@ -754,7 +757,7 @@ def decorator(*args, **kwargs): codegen_command = aot_template.format( **config, ) - print(codegen_command) + logger.debug("codegen command: %s", codegen_command) codegen_commands.append(codegen_command) multi_process_do(codegen_commands) @@ -771,7 +774,7 @@ def decorator(*args, **kwargs): if op_name not in OpProtoHolder.instance().op_proto_map.keys(): so_path = find_so_path(generated_dir, python_package_name) - print("== we find so_path: ", so_path) + logger.debug("== we find so_path: %s", so_path) assert so_path is not None paddle.utils.cpp_extension.load_op_meta_info_and_register_op(so_path) diff --git a/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py b/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py index 98589a4c376..7b26eaddc93 100644 --- a/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py +++ b/fastdeploy/model_executor/ops/triton_ops/triton_utils_v2.py @@ -16,6 +16,7 @@ import importlib import inspect +import logging import os import re import sys @@ -23,6 +24,8 @@ import paddle import triton +logger = logging.getLogger(__name__) + from .triton_utils import ( SubstituteTemplate, build_package, @@ -203,7 +206,7 @@ def decorator(*args, **kwargs): tp_rank = paddle.distributed.get_rank() generated_dir = os.getenv("TRITON_KERNEL_CACHE_DIR", f"/tmp/triton_cache/rank{tp_rank}") - print("the kernel cache dir is:", generated_dir) + logger.debug("the kernel cache dir is: %s", generated_dir) generated_dir = f"{generated_dir}/{op_name}" os.makedirs(generated_dir, exist_ok=True) @@ -240,7 +243,7 @@ def decorator(*args, **kwargs): so_path = find_so_path(generated_dir, python_package_name) if so_path is None: - print("== we do not find so_path, we need to compile it") + logger.debug("== we do not find so_path, we need to compile it") with open(paddle_custom_op_file_path, "w") as f: f.write( SubstituteTemplate( @@ -290,7 +293,7 @@ def decorator(*args, **kwargs): codegen_command = aot_template.format( **config, ) - print(codegen_command) + logger.debug("codegen command: %s", codegen_command) codegen_commands.append(codegen_command) multi_process_do(codegen_commands) @@ -307,7 +310,7 @@ def decorator(*args, **kwargs): # so_path have be found! so_path = find_so_path(generated_dir, python_package_name) - print("== we find so_path: ", so_path) + logger.debug("== we find so_path: %s", so_path) assert so_path is not None dir_path = os.path.dirname(so_path) sys.path.append(dir_path) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 850e371192a..0ce268b9971 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -252,10 +252,16 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: "Request=%s, InputToken=%s, CachedDetail=%s, OutputToken=%s, " "TokenRatio=%.2f, TTFT=%.2f, E2E=%.2f, IsPrefill=%s, " "RecoveryStop=%s, PreemptedCount=%s", - task_id, task.prompt_token_ids_len, cached_detail, - self.tokens_counter[task_id], token_ratio, ttft, - e2e_time, is_prefill, recovery_stop, - getattr(task.metrics, 'preempted_count', 0) + task_id, + task.prompt_token_ids_len, + cached_detail, + self.tokens_counter[task_id], + token_ratio, + ttft, + e2e_time, + is_prefill, + recovery_stop, + getattr(task.metrics, "preempted_count", 0), ) main_process_metrics.request_token_ratio.observe(token_ratio) @@ -532,8 +538,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False result.error_code = 400 result.error_message = f"{task_id} failed to {self.prefill_result_status[task_id]}" llm_logger.debug( - "wait for sending cache, request_id: %s, cost seconds: %.5f", - task_id, time.time() - start_time + "wait for sending cache, request_id: %s, cost seconds: %.5f", task_id, time.time() - start_time ) result.metrics.send_request_output_to_decode_time = time.time() self.split_connector.send_first_token(task.disaggregate_info, [result]) @@ -952,10 +957,17 @@ def _process_batch_output(self): "Request=%s, InputToken=%s, CachedDetail=%s, OutputToken=%s, " "TokenRatio=%.2f, TTFT=%.2f, TTFT_S=%.2f, E2E=%.2f, " "IsPrefill=%s, RecoveryStop=%s, PreemptedCount=%s", - task_id, task.prompt_token_ids_len, cached_detail, - self.tokens_counter[task_id], token_ratio, ttft, ttft_s, - e2e_time, is_prefill, recovery_stop, - getattr(task.metrics, 'preempted_count', 0) + task_id, + task.prompt_token_ids_len, + cached_detail, + self.tokens_counter[task_id], + token_ratio, + ttft, + ttft_s, + e2e_time, + is_prefill, + recovery_stop, + getattr(task.metrics, "preempted_count", 0), ) main_process_metrics.request_token_ratio.observe(token_ratio) diff --git a/fastdeploy/scheduler/global_scheduler.py b/fastdeploy/scheduler/global_scheduler.py index 9e7df4aee0d..501d58b3d8a 100644 --- a/fastdeploy/scheduler/global_scheduler.py +++ b/fastdeploy/scheduler/global_scheduler.py @@ -492,9 +492,10 @@ def get_requests( ) serialized_requests += [(lucky_request_queue_name, element) for element in elements] scheduler_logger.debug( - "Scheduler %s has stolen some requests from another lucky one. " - "(name=%s num=%d)", - self.name, lucky, len(serialized_requests), + "Scheduler %s has stolen some requests from another lucky one. " "(name=%s num=%d)", + self.name, + lucky, + len(serialized_requests), ) else: exist_num = self.client.exists(self._instance_name(lucky)) @@ -520,7 +521,8 @@ def get_requests( if scheduler_name != self.name: scheduler_logger.debug( "Scheduler %s has stolen a request from another scheduler. (name=%s)", - self.name, scheduler_name, + self.name, + scheduler_name, ) long_partial_requests = 0 @@ -605,7 +607,9 @@ def get_requests( ) if len(requests) > 0: - scheduler_logger.debug("Scheduler has pulled some request: %s", [request.request_id for request in requests]) + scheduler_logger.debug( + "Scheduler has pulled some request: %s", [request.request_id for request in requests] + ) return requests def _put_results_worker(self, tasks: List[Task]): @@ -861,7 +865,9 @@ def update_config(self, load_shards_num: Optional[int], reallocate: Optional[boo self.shard = self._get_hash_slot(self.name) % self.load_shards_num scheduler_logger.info( - "Scheduler has reload config, " - "load_shards_num(%s => %s) shard(%s => %s)", - old_load_shards_num, self.load_shards_num, old_shard, self.shard, + "Scheduler has reload config, " "load_shards_num(%s => %s) shard(%s => %s)", + old_load_shards_num, + self.load_shards_num, + old_shard, + self.shard, ) diff --git a/fastdeploy/scheduler/local_scheduler.py b/fastdeploy/scheduler/local_scheduler.py index 651efc0cee0..f36f4f10946 100644 --- a/fastdeploy/scheduler/local_scheduler.py +++ b/fastdeploy/scheduler/local_scheduler.py @@ -300,7 +300,9 @@ def get_requests( scheduler_logger.debug(f"Scheduler has put all just-pulled request into the queue: {len(batch_ids)}") if len(requests) > 0: - scheduler_logger.debug("Scheduler has pulled some request: %s", [request.request_id for request in requests]) + scheduler_logger.debug( + "Scheduler has pulled some request: %s", [request.request_id for request in requests] + ) return requests diff --git a/fastdeploy/scheduler/splitwise_scheduler.py b/fastdeploy/scheduler/splitwise_scheduler.py index cd5e0736436..a66ae8db461 100644 --- a/fastdeploy/scheduler/splitwise_scheduler.py +++ b/fastdeploy/scheduler/splitwise_scheduler.py @@ -119,7 +119,7 @@ def start(self, role, host, disaggregated): """ Start APIScheduler and InferScheduler backup threads """ - logger.info(f"Scheduler Start With: role:{role}, host:{host}, disaggregated:{disaggregated}") + logger.info("Scheduler Start With: role:%s, host:%s, disaggregated:%s", role, host, disaggregated) self.infer.start(role, host, disaggregated) self.scheduler.start() @@ -611,7 +611,7 @@ def select(req, nodes, blur_step): break blur_idx = idx node = random.choice(nodes[: blur_idx + 1]) - logger.info(f"Schedule Req {req.request_id}(len:{req.prompt_token_ids_len}) to {node}") + logger.debug("Schedule Req %s(len:%s) to %s", req.request_id, req.prompt_token_ids_len, node) return node if role == "prefill" or role == "mixed": @@ -727,7 +727,7 @@ def check_redis_version(self): version_parts[1] >= 2 if version_parts[0] == 6 else True ), f"Redis version {redis_version} too low. Please upgrade to Redis 6.2+ to support batch RPOP operations." - logger.info(f"Redis version {redis_version} detected. Using native batch RPOP.") + logger.info("Redis version %s detected. Using native batch RPOP.", redis_version) def start(self, role, host, disaggregated): """ @@ -805,7 +805,7 @@ def select_writer(req): req = pickle.loads(req_str) group = req.get("group", "") writer_idx = select_writer(req) - logger.info(f"Infer Scheduler Get Req: {req.request_id} writer idx {writer_idx}") + logger.debug("Infer Scheduler Get Req: %s writer idx %s", req.request_id, writer_idx) req.request_id = f"{req.request_id}#{writer_idx}#{group}" if self.role == "prefill" or self.role == "mixed": self.reqs_queue.append(req) @@ -884,7 +884,7 @@ def put_results(self, results): for result in results: if result.error_code != 200 or result.finished: self.node.finish_req(result.request_id) - logger.info(f"{result.request_id} finished, node load is {self.node.load}") + logger.debug("%s finished, node load is %s", result.request_id, self.node.load) req_ids.add(result.request_id)