diff --git a/infscale/agent/agent.py b/infscale/agent/agent.py index 7f55ea03..955f3107 100644 --- a/infscale/agent/agent.py +++ b/infscale/agent/agent.py @@ -219,9 +219,6 @@ async def _init_controller_session(self) -> bool: # create a task to send heart beat periodically _ = asyncio.create_task(self.heart_beat()) - # create a task to send status in an event-driven fashion - _ = asyncio.create_task(self.report()) - # create a task to wait for controller commands _ = asyncio.create_task(self.fetch_command()) @@ -312,6 +309,27 @@ def _handle_command(self, action: pb2.Action) -> None: action.job_id, msg_type=MessageType.FORCE_TERMINATE ) + case CommandAction.REQUEST_RESOURCES: + gpu_stats, vram_stats = self.gpu_monitor.get_metrics() + cpu_stats, dram_stats = self.cpu_monitor.get_metrics() + + gpu_msg_list = GpuMonitor.stats_to_proto(gpu_stats) + vram_msg_list = GpuMonitor.stats_to_proto(vram_stats) + cpu_stats_msg = CpuMonitor.stats_to_proto(cpu_stats) + dram_stats_msg = CpuMonitor.stats_to_proto(dram_stats) + + status_msg = pb2.ResourceStats( + id=self.id, + gpu_stats=gpu_msg_list, + vram_stats=vram_msg_list, + cpu_stats=cpu_stats_msg, + dram_stats=dram_stats_msg, + ) + try: + self.stub.update_resources(status_msg) + except Exception as e: + logger.error(f"Failed to update resources {e}") + case CommandAction.SETUP: port_count = int.from_bytes(action.manifest, byteorder="big") ports = self._reserve_ports(port_count) @@ -415,42 +433,10 @@ def _terminate_workers(self, config: JobConfig) -> None: ) self.worker_mgr._signal_terminate_wrkrs(job_id, True, stop_wrkrs, msg_type) - async def report(self): - """Report resource stats to controller.""" - while True: - gpu_stats, vram_stats = await self.gpu_monitor.metrics() - cpu_stats, dram_stats = await self.cpu_monitor.metrics() - - gpu_msg_list = GpuMonitor.stats_to_proto(gpu_stats) - vram_msg_list = GpuMonitor.stats_to_proto(vram_stats) - cpu_stats_msg = CpuMonitor.stats_to_proto(cpu_stats) - dram_stats_msg = CpuMonitor.stats_to_proto(dram_stats) - - status_msg = pb2.ResourceStats( - id=self.id, - gpu_stats=gpu_msg_list, - vram_stats=vram_msg_list, - cpu_stats=cpu_stats_msg, - dram_stats=dram_stats_msg, - ) - try: - self.stub.update_resources(status_msg) - except Exception as e: - logger.error(f"Failed to update resources {e}") - break - def monitor(self): """Monitor workers and resources.""" _ = asyncio.create_task(self._monitor_status()) _ = asyncio.create_task(self._monitor_metrics()) - _ = asyncio.create_task(self._monitor_gpu()) - _ = asyncio.create_task(self._monitor_cpu()) - - async def _monitor_gpu(self): - await self.gpu_monitor.start() - - async def _monitor_cpu(self): - await self.cpu_monitor.start() async def run(self): """Start the agent.""" diff --git a/infscale/controller/controller.py b/infscale/controller/controller.py index a7215e1f..fc754468 100644 --- a/infscale/controller/controller.py +++ b/infscale/controller/controller.py @@ -42,6 +42,10 @@ from infscale.monitor.cpu import CpuMonitor from infscale.monitor.gpu import GpuMonitor from infscale.proto import management_pb2 as pb2, management_pb2_grpc as pb2_grpc +from infscale.utils.timer import Timer + + +RESOURCES_TIMEOUT = 3 # time to wait for agent resources logger = None @@ -74,6 +78,7 @@ def __init__(self, config: CtrlConfig): self.autoscaler = AutoScaler(self) self.planner = Planner(config.job_plans, config.autoscale) + self.resources_timer: Timer = None async def _start_server(self): server_options = [ @@ -130,6 +135,18 @@ async def handle_resources(self, request: pb2.ResourceStats) -> None: gpu_stats, vram_stats, cpu_stats, dram_stats ) + if not self.resources_timer: + self.resources_timer = Timer( + RESOURCES_TIMEOUT, self._notify_ctxts_about_resources + ) + + self.resources_timer.renew() + + def _notify_ctxts_about_resources(self) -> None: + """Notify job contexts about agents resources.""" + for ctx in self.job_contexts.values(): + ctx.on_res_ready() + async def handle_wrk_status(self, req: pb2.WorkerStatus) -> None: """Handle worker status.""" job_id, status, wrk_id = ( @@ -222,6 +239,19 @@ async def job_setup(self, job_id: str, agent_data: AgentMetaData) -> None: context = agent_context.get_grpc_ctx() await context.write(payload) + async def request_agents_res(self) -> None: + """Send a request to agents to fetch resources.""" + tasks = [] + for agent_id in self.agent_contexts.keys(): + agent_context = self.agent_contexts[agent_id] + context = agent_context.get_grpc_ctx() + + payload = pb2.Action(type=CommandAction.REQUEST_RESOURCES) + task = context.write(payload) + tasks.append(task) + + await asyncio.gather(*tasks) + async def send_config_to_agent( self, agent_id: str, cfg: JobConfig, action: CommandActionModel ) -> None: diff --git a/infscale/controller/ctrl_dtype.py b/infscale/controller/ctrl_dtype.py index 6ec02208..06584d13 100644 --- a/infscale/controller/ctrl_dtype.py +++ b/infscale/controller/ctrl_dtype.py @@ -38,6 +38,7 @@ class CommandAction(str, Enum): STOP = "stop" # CLI - Controller stop command, ctrl<->agent on worker failure UPDATE = "update" # CLI - Controller update command SETUP = "setup" # ctrl<->agent setup job, assign port numbers to workers + REQUEST_RESOURCES = "request_resources" # ctrl<->agent request resources FINISH_JOB = "finish_job" # ctrl<->agent action to notify job's completion CHECK_LOOP = ( "check_loop" # ctrl<->agent action for workers to check their pipeline loops diff --git a/infscale/controller/job_context.py b/infscale/controller/job_context.py index 7db1292c..22bfb207 100644 --- a/infscale/controller/job_context.py +++ b/infscale/controller/job_context.py @@ -669,6 +669,8 @@ def __init__(self, ctrl: Controller, job_id: str): # event to update the config after all agents added ports and ip address self.agents_setup_event = asyncio.Event() + # event to proceed with job deployment once resources are ready + self.res_event = asyncio.Event() # list of agent ids that will deploy workers self.running_agent_info: dict[str, AgentMetaData] = {} self.past_running_agent_info: dict[str, AgentMetaData] = {} @@ -1374,6 +1376,10 @@ async def cond_recovery(self): """Handle the transition to recovery.""" await self.state.cond_recovery() + def on_res_ready(self) -> None: + """Set resources event when resources are ready.""" + self.res_event.set() + async def __update(self): """Transition to UPDATING state.""" # DO NOT call this method in job_context instance or any other places. @@ -1381,6 +1387,12 @@ async def __update(self): # (e.g., RunningState, RecoveryState, etc). self._manage_agent_metadata() + # get latest agents resources + await self.ctrl.request_agents_res() + + # wait to gather resources from all agents + await self.res_event.wait() + try: self.process_cfg() except InvalidConfig as e: @@ -1403,6 +1415,8 @@ async def __update(self): # update server ids self.update_server_ids() + self.res_event.clear() + async def __stop(self): """Transition to STOPPING state.""" # DO NOT call this method in job_context instance or any other places. @@ -1449,6 +1463,11 @@ async def __start(self): # (e.g., ReadyState, CompleteState, etc). self._manage_agent_metadata() + await self.ctrl.request_agents_res() + + # wait to gather resources from all agents + await self.res_event.wait() + self._check_agent_info() self.process_cfg() @@ -1468,3 +1487,5 @@ async def __start(self): # update server ids self.update_server_ids() + + self.res_event.clear() diff --git a/infscale/monitor/cpu.py b/infscale/monitor/cpu.py index 58f988c2..b18c16d6 100644 --- a/infscale/monitor/cpu.py +++ b/infscale/monitor/cpu.py @@ -26,9 +26,6 @@ from infscale.proto import management_pb2 as pb2 -DEFAULT_INTERVAL = 10 # 10 seconds - - @dataclass class CPUStats: """CPU statistics.""" @@ -59,14 +56,6 @@ def __post_init__(self): class CpuMonitor: """CpuMonitor class.""" - def __init__(self, interval: int = DEFAULT_INTERVAL): - """Initialize CpuMonitor instance.""" - self.interval = interval - - self.mon_event = asyncio.Event() - self.cpu_stats = None - self.dram_stats = None - def get_metrics(self) -> tuple[CPUStats, DRAMStats]: """Start to monitor CPU statistics.""" # total number of CPUs (logical) @@ -87,27 +76,6 @@ def get_metrics(self) -> tuple[CPUStats, DRAMStats]: return cpu_stats, dram_stats - async def metrics(self) -> tuple[CPUStats, DRAMStats]: - """Return statistics on CPU and DRAM resources.""" - # Wait until data refreshes - await self.mon_event.wait() - # block metrics() call again - self.mon_event.clear() - - return self.cpu_stats, self.dram_stats - - async def start(self): - """Start to monitor CPU statistics.""" - while True: - cpu_stats, dram_stats = self.get_metrics() - - self.cpu_stats = cpu_stats - self.dram_stats = dram_stats - # unlbock metrics() call - self.mon_event.set() - - await asyncio.sleep(self.interval) - @staticmethod def stats_to_proto( stats: Union[CPUStats, DRAMStats], diff --git a/infscale/monitor/gpu.py b/infscale/monitor/gpu.py index 9fc7e634..569e6fec 100644 --- a/infscale/monitor/gpu.py +++ b/infscale/monitor/gpu.py @@ -37,8 +37,6 @@ from infscale.proto import management_pb2 as pb2 -DEFAULT_INTERVAL = 10 # 10 seconds - logger = None @@ -85,7 +83,7 @@ def __post_init__(self): class GpuMonitor: """GpuMonitor class.""" - def __init__(self, interval: int = DEFAULT_INTERVAL): + def __init__(self): """Initialize GpuMonitor instance.""" global logger logger = get_logger() @@ -99,12 +97,6 @@ def __init__(self, interval: int = DEFAULT_INTERVAL): logger.warning("Failed to initialize NVML. No GPU available.") logger.debug(f"Exception: {e}") - self.interval = interval - - self.mon_event = asyncio.Event() - self.computes = list() - self.mems = list() - def get_metrics(self) -> tuple[list[GpuStat], list[VramStat]]: """Return gpu and vram statistics.""" if not self.gpu_available: @@ -115,38 +107,6 @@ def get_metrics(self) -> tuple[list[GpuStat], list[VramStat]]: return computes, mems - async def metrics(self) -> tuple[list[GpuStat], list[VramStat]]: - """Return statistics on GPU resources.""" - # Wait until data refreshes - logger.debug("wait for monitor event") - await self.mon_event.wait() - logger.debug("monitor event is set") - # block metrics() call again - self.mon_event.clear() - - return self.computes, self.mems - - async def start(self): - """Start to monitor GPU statistics - utilization and vram usage. - - utilization reports device's utilization. - it's not a per-application metric. - vram usage is also an aggregated metric, not a per-application metric. - """ - if not self.gpu_available: - logger.info("no GPU available, skipping metrics collection.") - return - - while True: - computes, mems = self._get_gpu_stats() - - self.mems = mems - self.computes = computes - # unlbock metrics() call - self.mon_event.set() - - await asyncio.sleep(self.interval) - def _get_gpu_stats(self) -> tuple[list[GpuStat], list[VramStat]]: """Return GPU and VRam resources.""" count = nvmlDeviceGetCount()