Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fastdeploy/splitwise/internal_adapter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ def _recv_external_module_control_instruct(self):
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)

elif task["cmd"] == "interrupt_requests":
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 其他命令分支(如 get_payloadcheck_health)在响应前都有 logger.debug(f"Response for task: {task_id_str}"),建议此处也添加以保持一致性,方便排查问题。

elif task["cmd"] == "interrupt_requests":
    self.engine.resource_manager.add_abort_req_ids(task["req_ids"])
    result = {
        "task_id": task_id_str,
        "result": {"success": True, "interrupted_req_ids": task["req_ids"]},
    }
    logger.debug(f"Response for task: {task_id_str}")
    with self.response_lock:
        self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)

self.engine.resource_manager.add_abort_req_ids(task["req_ids"])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 缺少对 req_ids 字段的防御性校验。

如果传入的 task 缺少 req_ids 键或其值为空列表,当前代码会抛出 KeyError 或执行一次无意义的空调用。虽然外层 except Exception 会捕获异常不会导致线程崩溃,但调用方不会收到任何响应,可能导致请求超时。

建议添加校验:

elif task["cmd"] == "interrupt_requests":
    req_ids = task.get("req_ids", [])
    if not req_ids:
        result = {
            "task_id": task_id_str,
            "result": {"success": False, "message": "req_ids is empty or missing"},
        }
    else:
        self.engine.resource_manager.add_abort_req_ids(req_ids)
        result = {
            "task_id": task_id_str,
            "result": {"success": True, "interrupted_req_ids": req_ids},
        }
    with self.response_lock:
        self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)

result = {
"task_id": task_id_str,
"result": {"success": True, "interrupted_req_ids": task["req_ids"]},
}
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)

except Exception as e:
logger.error(f"handle_control_cmd got error: {e}, {traceback.format_exc()!s}")

Expand Down
Loading