Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 21 additions & 35 deletions infscale/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ async def _init_controller_session(self) -> bool:
# create a task to send heart beat periodically
_ = asyncio.create_task(self.heart_beat())

# create a task to send status in an event-driven fashion
_ = asyncio.create_task(self.report())

# create a task to wait for controller commands
_ = asyncio.create_task(self.fetch_command())

Expand Down Expand Up @@ -312,6 +309,27 @@ def _handle_command(self, action: pb2.Action) -> None:
action.job_id, msg_type=MessageType.FORCE_TERMINATE
)

case CommandAction.REQUEST_RESOURCES:
gpu_stats, vram_stats = self.gpu_monitor.get_metrics()
cpu_stats, dram_stats = self.cpu_monitor.get_metrics()

gpu_msg_list = GpuMonitor.stats_to_proto(gpu_stats)
vram_msg_list = GpuMonitor.stats_to_proto(vram_stats)
cpu_stats_msg = CpuMonitor.stats_to_proto(cpu_stats)
dram_stats_msg = CpuMonitor.stats_to_proto(dram_stats)

status_msg = pb2.ResourceStats(
id=self.id,
gpu_stats=gpu_msg_list,
vram_stats=vram_msg_list,
cpu_stats=cpu_stats_msg,
dram_stats=dram_stats_msg,
)
try:
self.stub.update_resources(status_msg)
except Exception as e:
logger.error(f"Failed to update resources {e}")

case CommandAction.SETUP:
port_count = int.from_bytes(action.manifest, byteorder="big")
ports = self._reserve_ports(port_count)
Expand Down Expand Up @@ -415,42 +433,10 @@ def _terminate_workers(self, config: JobConfig) -> None:
)
self.worker_mgr._signal_terminate_wrkrs(job_id, True, stop_wrkrs, msg_type)

async def report(self):
"""Report resource stats to controller."""
while True:
gpu_stats, vram_stats = await self.gpu_monitor.metrics()
cpu_stats, dram_stats = await self.cpu_monitor.metrics()

gpu_msg_list = GpuMonitor.stats_to_proto(gpu_stats)
vram_msg_list = GpuMonitor.stats_to_proto(vram_stats)
cpu_stats_msg = CpuMonitor.stats_to_proto(cpu_stats)
dram_stats_msg = CpuMonitor.stats_to_proto(dram_stats)

status_msg = pb2.ResourceStats(
id=self.id,
gpu_stats=gpu_msg_list,
vram_stats=vram_msg_list,
cpu_stats=cpu_stats_msg,
dram_stats=dram_stats_msg,
)
try:
self.stub.update_resources(status_msg)
except Exception as e:
logger.error(f"Failed to update resources {e}")
break

def monitor(self):
"""Monitor workers and resources."""
_ = asyncio.create_task(self._monitor_status())
_ = asyncio.create_task(self._monitor_metrics())
_ = asyncio.create_task(self._monitor_gpu())
_ = asyncio.create_task(self._monitor_cpu())

async def _monitor_gpu(self):
await self.gpu_monitor.start()

async def _monitor_cpu(self):
await self.cpu_monitor.start()

async def run(self):
"""Start the agent."""
Expand Down
30 changes: 30 additions & 0 deletions infscale/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from infscale.monitor.cpu import CpuMonitor
from infscale.monitor.gpu import GpuMonitor
from infscale.proto import management_pb2 as pb2, management_pb2_grpc as pb2_grpc
from infscale.utils.timer import Timer


RESOURCES_TIMEOUT = 3 # time to wait for agent resources


logger = None
Expand Down Expand Up @@ -74,6 +78,7 @@ def __init__(self, config: CtrlConfig):
self.autoscaler = AutoScaler(self)

self.planner = Planner(config.job_plans, config.autoscale)
self.resources_timer: Timer = None

async def _start_server(self):
server_options = [
Expand Down Expand Up @@ -130,6 +135,18 @@ async def handle_resources(self, request: pb2.ResourceStats) -> None:
gpu_stats, vram_stats, cpu_stats, dram_stats
)

if not self.resources_timer:
self.resources_timer = Timer(
RESOURCES_TIMEOUT, self._notify_ctxts_about_resources
)

self.resources_timer.renew()

def _notify_ctxts_about_resources(self) -> None:
"""Notify job contexts about agents resources."""
for ctx in self.job_contexts.values():
ctx.on_res_ready()

async def handle_wrk_status(self, req: pb2.WorkerStatus) -> None:
"""Handle worker status."""
job_id, status, wrk_id = (
Expand Down Expand Up @@ -222,6 +239,19 @@ async def job_setup(self, job_id: str, agent_data: AgentMetaData) -> None:
context = agent_context.get_grpc_ctx()
await context.write(payload)

async def request_agents_res(self) -> None:
"""Send a request to agents to fetch resources."""
tasks = []
for agent_id in self.agent_contexts.keys():
agent_context = self.agent_contexts[agent_id]
context = agent_context.get_grpc_ctx()

payload = pb2.Action(type=CommandAction.REQUEST_RESOURCES)
task = context.write(payload)
tasks.append(task)

await asyncio.gather(*tasks)

async def send_config_to_agent(
self, agent_id: str, cfg: JobConfig, action: CommandActionModel
) -> None:
Expand Down
1 change: 1 addition & 0 deletions infscale/controller/ctrl_dtype.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class CommandAction(str, Enum):
STOP = "stop" # CLI - Controller stop command, ctrl<->agent on worker failure
UPDATE = "update" # CLI - Controller update command
SETUP = "setup" # ctrl<->agent setup job, assign port numbers to workers
REQUEST_RESOURCES = "request_resources" # ctrl<->agent request resources
FINISH_JOB = "finish_job" # ctrl<->agent action to notify job's completion
CHECK_LOOP = (
"check_loop" # ctrl<->agent action for workers to check their pipeline loops
Expand Down
21 changes: 21 additions & 0 deletions infscale/controller/job_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,8 @@ def __init__(self, ctrl: Controller, job_id: str):

# event to update the config after all agents added ports and ip address
self.agents_setup_event = asyncio.Event()
# event to proceed with job deployment once resources are ready
self.res_event = asyncio.Event()
# list of agent ids that will deploy workers
self.running_agent_info: dict[str, AgentMetaData] = {}
self.past_running_agent_info: dict[str, AgentMetaData] = {}
Expand Down Expand Up @@ -1374,13 +1376,23 @@ async def cond_recovery(self):
"""Handle the transition to recovery."""
await self.state.cond_recovery()

def on_res_ready(self) -> None:
"""Set resources event when resources are ready."""
self.res_event.set()

async def __update(self):
"""Transition to UPDATING state."""
# DO NOT call this method in job_context instance or any other places.
# Call it only in methods of a state instance
# (e.g., RunningState, RecoveryState, etc).
self._manage_agent_metadata()

# get latest agents resources
await self.ctrl.request_agents_res()

# wait to gather resources from all agents
await self.res_event.wait()

try:
self.process_cfg()
except InvalidConfig as e:
Expand All @@ -1403,6 +1415,8 @@ async def __update(self):
# update server ids
self.update_server_ids()

self.res_event.clear()

async def __stop(self):
"""Transition to STOPPING state."""
# DO NOT call this method in job_context instance or any other places.
Expand Down Expand Up @@ -1449,6 +1463,11 @@ async def __start(self):
# (e.g., ReadyState, CompleteState, etc).
self._manage_agent_metadata()

await self.ctrl.request_agents_res()

# wait to gather resources from all agents
await self.res_event.wait()

self._check_agent_info()

self.process_cfg()
Expand All @@ -1468,3 +1487,5 @@ async def __start(self):

# update server ids
self.update_server_ids()

self.res_event.clear()
32 changes: 0 additions & 32 deletions infscale/monitor/cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
from infscale.proto import management_pb2 as pb2


DEFAULT_INTERVAL = 10 # 10 seconds


@dataclass
class CPUStats:
"""CPU statistics."""
Expand Down Expand Up @@ -59,14 +56,6 @@ def __post_init__(self):
class CpuMonitor:
"""CpuMonitor class."""

def __init__(self, interval: int = DEFAULT_INTERVAL):
"""Initialize CpuMonitor instance."""
self.interval = interval

self.mon_event = asyncio.Event()
self.cpu_stats = None
self.dram_stats = None

def get_metrics(self) -> tuple[CPUStats, DRAMStats]:
"""Start to monitor CPU statistics."""
# total number of CPUs (logical)
Expand All @@ -87,27 +76,6 @@ def get_metrics(self) -> tuple[CPUStats, DRAMStats]:

return cpu_stats, dram_stats

async def metrics(self) -> tuple[CPUStats, DRAMStats]:
"""Return statistics on CPU and DRAM resources."""
# Wait until data refreshes
await self.mon_event.wait()
# block metrics() call again
self.mon_event.clear()

return self.cpu_stats, self.dram_stats

async def start(self):
"""Start to monitor CPU statistics."""
while True:
cpu_stats, dram_stats = self.get_metrics()

self.cpu_stats = cpu_stats
self.dram_stats = dram_stats
# unlbock metrics() call
self.mon_event.set()

await asyncio.sleep(self.interval)

@staticmethod
def stats_to_proto(
stats: Union[CPUStats, DRAMStats],
Expand Down
42 changes: 1 addition & 41 deletions infscale/monitor/gpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
from infscale.proto import management_pb2 as pb2


DEFAULT_INTERVAL = 10 # 10 seconds

logger = None


Expand Down Expand Up @@ -85,7 +83,7 @@ def __post_init__(self):
class GpuMonitor:
"""GpuMonitor class."""

def __init__(self, interval: int = DEFAULT_INTERVAL):
def __init__(self):
"""Initialize GpuMonitor instance."""
global logger
logger = get_logger()
Expand All @@ -99,12 +97,6 @@ def __init__(self, interval: int = DEFAULT_INTERVAL):
logger.warning("Failed to initialize NVML. No GPU available.")
logger.debug(f"Exception: {e}")

self.interval = interval

self.mon_event = asyncio.Event()
self.computes = list()
self.mems = list()

def get_metrics(self) -> tuple[list[GpuStat], list[VramStat]]:
"""Return gpu and vram statistics."""
if not self.gpu_available:
Expand All @@ -115,38 +107,6 @@ def get_metrics(self) -> tuple[list[GpuStat], list[VramStat]]:

return computes, mems

async def metrics(self) -> tuple[list[GpuStat], list[VramStat]]:
"""Return statistics on GPU resources."""
# Wait until data refreshes
logger.debug("wait for monitor event")
await self.mon_event.wait()
logger.debug("monitor event is set")
# block metrics() call again
self.mon_event.clear()

return self.computes, self.mems

async def start(self):
"""Start to monitor GPU statistics - utilization and vram usage.

utilization reports device's utilization.
it's not a per-application metric.
vram usage is also an aggregated metric, not a per-application metric.
"""
if not self.gpu_available:
logger.info("no GPU available, skipping metrics collection.")
return

while True:
computes, mems = self._get_gpu_stats()

self.mems = mems
self.computes = computes
# unlbock metrics() call
self.mon_event.set()

await asyncio.sleep(self.interval)

def _get_gpu_stats(self) -> tuple[list[GpuStat], list[VramStat]]:
"""Return GPU and VRam resources."""
count = nvmlDeviceGetCount()
Expand Down