From 772cdc78e1162648b124e22a832d45c5d8985e4b Mon Sep 17 00:00:00 2001 From: Rares Gaia Date: Thu, 11 Dec 2025 12:50:28 +0200 Subject: [PATCH] fix: recovery after agent kill and rejoin In the case when an agent that deployed workers is destroyed, the current code implementation would skip the agent that previously deployed failed workers if it wasn't found during the first iteration of recovery, continuing with remaining agents. We had to refactor the code to remove this limitation. So when an agent fails and is restarted, it will be included in the recovery process. Added changes for when an agent joins controller, to also update all existing job contexts with the details about this agent. So whenever an agent joins, all job contexts will be able to use its resources. --- infscale/controller/job_context.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/infscale/controller/job_context.py b/infscale/controller/job_context.py index cfa8a46..0e9a82b 100644 --- a/infscale/controller/job_context.py +++ b/infscale/controller/job_context.py @@ -428,6 +428,7 @@ async def _assign_resources_for_recovery( wrk_resources_map = {} while True: + self.context.manage_agent_metadata() wrk_resources_map = self._get_wrk_resources_map(failed_wrk_ids) if len(wrk_resources_map) == len(failed_wrk_ids): @@ -608,10 +609,13 @@ def _search_gpu_on_all_agents( if agent_id == curr_agent_id: continue - return self._assign_available_gpu_to_worker( + assign_success = self._assign_available_gpu_to_worker( agent_id, resources, wrk_id, wrk_agent_map, agent_gpu_map ) + if assign_success: + return True + return False def enum_(self) -> JobStateEnum: @@ -1207,7 +1211,7 @@ def _get_state_class(self, state_enum: JobStateEnum): } return state_mapping[state_enum] - def _manage_agent_metadata(self) -> None: + def manage_agent_metadata(self) -> None: """Manage agent metadata by create/update/delete.""" agent_contexts = self.ctrl.agent_contexts @@ -1380,7 +1384,7 @@ async def __update(self): # DO NOT call this method in job_context instance or any other places. # Call it only in methods of a state instance # (e.g., RunningState, RecoveryState, etc). - self._manage_agent_metadata() + self.manage_agent_metadata() try: self.process_cfg() @@ -1448,7 +1452,7 @@ async def __start(self): # DO NOT call this method in job_context instance or any other places. # Call it only in methods of a state instance # (e.g., ReadyState, CompleteState, etc). - self._manage_agent_metadata() + self.manage_agent_metadata() self._check_agent_info()