Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,17 +598,17 @@ def check_health(self, time_interval_threashold=30):

async def run_control_method(self, request: ControlRequest):
api_server_logger.info(f"Received control request: {request}")
request_id = request.request_id
dealer, response_queue = await self.connection_manager.get_connection(request_id)
if not envs.ZMQ_SEND_BATCH_DATA:
dealer.write([b"", request_id.encode("utf-8")])
req_dict = request.to_dict()
if envs.ZMQ_SEND_BATCH_DATA:
req_dict["zmq_worker_pid"] = self.worker_pid
if not self.enable_mm:
self.zmq_client.send_json(req_dict)
else:
self.zmq_client.send_pyobj(req_dict)
request_id = request.request_id
dealer, response_queue = await self.connection_manager.get_connection(request_id)
if not envs.ZMQ_SEND_BATCH_DATA:
dealer.write([b"", request_id.encode("utf-8")])
try:
# todo: support user specified timeout. default 600s is enough for most control cases
response = await asyncio.wait_for(response_queue.get(), timeout=600)
Expand Down
Loading