diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 4c56e9bcd76..278d6e576ab 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -598,6 +598,10 @@ def check_health(self, time_interval_threashold=30): async def run_control_method(self, request: ControlRequest): api_server_logger.info(f"Received control request: {request}") + request_id = request.request_id + dealer, response_queue = await self.connection_manager.get_connection(request_id) + if not envs.ZMQ_SEND_BATCH_DATA: + dealer.write([b"", request_id.encode("utf-8")]) req_dict = request.to_dict() if envs.ZMQ_SEND_BATCH_DATA: req_dict["zmq_worker_pid"] = self.worker_pid @@ -605,10 +609,6 @@ async def run_control_method(self, request: ControlRequest): self.zmq_client.send_json(req_dict) else: self.zmq_client.send_pyobj(req_dict) - request_id = request.request_id - dealer, response_queue = await self.connection_manager.get_connection(request_id) - if not envs.ZMQ_SEND_BATCH_DATA: - dealer.write([b"", request_id.encode("utf-8")]) try: # todo: support user specified timeout. default 600s is enough for most control cases response = await asyncio.wait_for(response_queue.get(), timeout=600)