From f63e2476233961f04885f76c4fe4653ed5d01507 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:17:44 -0500 Subject: [PATCH 1/7] Aorta benchmark yaml configuration --- input/aorta_benchmark.yaml | 66 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 input/aorta_benchmark.yaml diff --git a/input/aorta_benchmark.yaml b/input/aorta_benchmark.yaml new file mode 100644 index 0000000..7053dec --- /dev/null +++ b/input/aorta_benchmark.yaml @@ -0,0 +1,66 @@ +# Aorta Benchmark Configuration +# +# This configuration controls the CVS benchmark runner for Aorta distributed training. +# The runner will bind-mount the aorta repository and apply these overrides. + +# Aorta installation path (bind-mounted into container) +aorta_path: /home/AMD/speriasw/projects/aorta + +# Container mount point +container_mount_path: /mnt + +# Aorta's base config file (relative to aorta_path) +base_config: config/distributed.yaml + +# Docker container settings +docker: + image: jeffdaily/pytorch:torchrec-dlrm-complete + container_name: aorta-benchmark + shm_size: 17G + network_mode: host + privileged: true + +# RCCL build configuration +rccl: + clone_url: https://github.com/ROCmSoftwarePlatform/rccl.git + branch: develop + build_path: /mnt/rccl + +# NCCL/RCCL environment variables +environment: + NCCL_MAX_NCHANNELS: 112 + NCCL_MAX_P2P_NCHANNELS: 112 + # TENSILE_STREAMK_MAX_CUS is computed as 256 - NCCL_MAX_NCHANNELS + NCCL_DEBUG: VERSION + TORCH_NCCL_HIGH_PRIORITY: 1 + OMP_NUM_THREADS: 1 + RCCL_MSCCL_ENABLE: 0 + +# Aorta training config overrides (passed via --override) +# These override values in config/distributed.yaml +training_overrides: + training.max_steps: 100 + profiling.active: 10 + # training.output_dir is set dynamically by the runner + +# Scripts to execute (relative to container_mount_path) +build_script: scripts/build_rccl.sh +experiment_script: scripts/rccl_exp.sh + +# Hardware configuration +gpus_per_node: 8 + +# Execution settings +timeout_seconds: 10800 # 3 hours +skip_rccl_build: false # Set to true if RCCL is already built + +# Expected results for validation +# NOTE: TraceLens reports TOTAL trace time for all profiled iterations (profiling.active=10) +# So max_avg_iteration_ms should be ~total_time, not per-iteration time +# Per-iteration would be: total_time / profiling.active = ~6200ms / 10 = ~620ms +expected_results: + max_avg_iteration_ms: 7000 # Total trace time threshold (~6200ms actual) + min_compute_ratio: 0.8 # Expect 80%+ compute utilization + min_overlap_ratio: 0.0 # Current metric uses exposed_comm, so overlap shows low + max_time_variance_ratio: 0.2 # Allow 20% variance between ranks + From 46270907796f4b544b60ef6a8f705629f8af2441 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:18:34 -0500 Subject: [PATCH 2/7] runner/parser architecture for clear seperation of concerns --- runners/__init__.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 runners/__init__.py diff --git a/runners/__init__.py b/runners/__init__.py new file mode 100644 index 0000000..b442f7f --- /dev/null +++ b/runners/__init__.py @@ -0,0 +1,23 @@ +""" +Runners module - Layer 1: Test Execution Wrappers. + +Runners are responsible for: +- Deploying test environments (containers, etc.) +- Executing benchmarks +- Collecting raw artifacts (logs, trace files) + +Runners should NOT: +- Parse results into structured data +- Validate performance thresholds +- Make pass/fail decisions +""" + +from runners._base_runner import BaseRunner, RunResult, RunConfig, RunStatus + +__all__ = [ + "BaseRunner", + "RunResult", + "RunConfig", + "RunStatus", +] + From 3449a5dbc40acb411ad16958dfbb9d939850ea13 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:26:05 -0500 Subject: [PATCH 3/7] Base runner implementation --- runners/_base_runner.py | 237 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) create mode 100644 runners/_base_runner.py diff --git a/runners/_base_runner.py b/runners/_base_runner.py new file mode 100644 index 0000000..29f5e2f --- /dev/null +++ b/runners/_base_runner.py @@ -0,0 +1,237 @@ +""" +Base runner interface and common data structures. + +Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional +import time +import logging + +log = logging.getLogger(__name__) + + +class RunStatus(Enum): + """Status of a benchmark run.""" + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + TIMEOUT = "timeout" + + +@dataclass +class RunConfig: + """ + Base configuration for all runners. + + Specific runners should extend this with their own config dataclasses. + """ + # Cluster configuration + nodes: List[str] + username: str + pkey: Optional[str] = None + password: Optional[str] = None + + # Execution settings + timeout_seconds: int = 3600 # 1 hour default + work_dir: Path = field(default_factory=lambda: Path("/tmp")) + output_dir: Path = field(default_factory=lambda: Path("/tmp/benchmark_output")) + + # Additional environment variables + env_vars: Dict[str, str] = field(default_factory=dict) + + # Metadata for tracking + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class RunResult: + """ + Result from a benchmark runner. + + Contains raw outputs and artifact paths - no parsed/validated data. + Parsing is the responsibility of the parsers layer. + """ + status: RunStatus + start_time: float + end_time: float + + # Raw outputs from execution + stdout: Dict[str, str] = field(default_factory=dict) # node -> stdout + stderr: Dict[str, str] = field(default_factory=dict) # node -> stderr + + # Paths to output artifacts + artifacts: Dict[str, Path] = field(default_factory=dict) # name -> path + + # Error information if failed + error_message: Optional[str] = None + exit_codes: Dict[str, int] = field(default_factory=dict) # node -> exit code + + # Metadata collected during run + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def duration_seconds(self) -> float: + """Total execution time in seconds.""" + return self.end_time - self.start_time + + @property + def succeeded(self) -> bool: + """Whether the run completed successfully.""" + return self.status == RunStatus.COMPLETED + + def get_artifact(self, name: str) -> Optional[Path]: + """Get artifact path by name, returns None if not found.""" + return self.artifacts.get(name) + + +class BaseRunner(ABC): + """ + Abstract base class for benchmark runners. + + Runners follow a lifecycle: + 1. setup() - Prepare environment (deploy containers, install deps) + 2. run() - Execute the benchmark + 3. teardown() - Cleanup resources + + The execute() method orchestrates this lifecycle with proper error handling. + """ + + def __init__(self, config: RunConfig): + """ + Initialize runner with configuration. + + Args: + config: Runner configuration + """ + self.config = config + self._setup_complete = False + + @property + def head_node(self) -> str: + """First node in the cluster, typically used for orchestration.""" + if not self.config.nodes: + raise ValueError("No nodes configured") + return self.config.nodes[0] + + @abstractmethod + def setup(self) -> bool: + """ + Prepare the environment before benchmark execution. + + This may include: + - Deploying containers + - Installing dependencies + - Copying files + - Setting up networking + + Returns: + True if setup succeeded, False otherwise + """ + pass + + @abstractmethod + def run(self, **kwargs) -> RunResult: + """ + Execute the benchmark. + + Args: + **kwargs: Benchmark-specific parameters + + Returns: + RunResult with raw outputs and artifact paths + """ + pass + + @abstractmethod + def teardown(self) -> bool: + """ + Cleanup resources after benchmark execution. + + This may include: + - Stopping containers + - Removing temporary files + - Releasing resources + + Returns: + True if teardown succeeded, False otherwise + """ + pass + + def execute(self, **kwargs) -> RunResult: + """ + Full execution lifecycle: setup -> run -> teardown. + + Handles errors at each stage and ensures teardown is always called. + + Args: + **kwargs: Passed to run() + + Returns: + RunResult from the run, or a failed result if setup fails + """ + start_time = time.time() + + try: + # Setup phase + log.info(f"Setting up {self.__class__.__name__}...") + if not self.setup(): + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + error_message="Setup failed" + ) + self._setup_complete = True + + # Run phase + log.info(f"Running {self.__class__.__name__}...") + result = self.run(**kwargs) + return result + + except Exception as e: + log.exception(f"Error during {self.__class__.__name__} execution") + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + error_message=str(e) + ) + + finally: + # Always attempt teardown + if self._setup_complete: + log.info(f"Tearing down {self.__class__.__name__}...") + try: + self.teardown() + except Exception as e: + log.warning(f"Teardown error (non-fatal): {e}") + + def validate_config(self) -> List[str]: + """ + Validate runner configuration. + + Override in subclasses to add specific validation. + + Returns: + List of validation error messages (empty if valid) + """ + errors = [] + + if not self.config.nodes: + errors.append("No nodes configured") + + if not self.config.username: + errors.append("No username configured") + + if not self.config.pkey and not self.config.password: + errors.append("No authentication method configured (pkey or password)") + + return errors + From 900866a38e53f084a0bf3e2bb9ac7956aa23c352 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:27:42 -0500 Subject: [PATCH 4/7] Aorta benchmark runner --- runners/aorta.py | 565 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 565 insertions(+) create mode 100644 runners/aorta.py diff --git a/runners/aorta.py b/runners/aorta.py new file mode 100644 index 0000000..98ad438 --- /dev/null +++ b/runners/aorta.py @@ -0,0 +1,565 @@ +""" +Aorta PyTorch benchmark runner. + +Deploys Docker containers and executes distributed training benchmarks +using the Aorta framework. Uses Docker SDK over SSH for container orchestration. + +Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. +""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional +import time +import logging + +try: + import docker + from docker.models.containers import Container + DOCKER_SDK_AVAILABLE = True +except ImportError: + DOCKER_SDK_AVAILABLE = False + docker = None + Container = None + +from runners._base_runner import BaseRunner, RunConfig, RunResult, RunStatus + +log = logging.getLogger(__name__) + + +@dataclass +class RcclConfig: + """RCCL build and runtime configuration.""" + clone_url: str = "https://github.com/ROCmSoftwarePlatform/rccl.git" + branch: str = "develop" + build_path: str = "/mnt/rccl" + + +@dataclass +class AortaDockerConfig: + """Docker container configuration for Aorta.""" + image: str = "jeffdaily/pytorch:torchrec-dlrm-complete" + container_name: str = "aorta-benchmark" + shm_size: str = "17G" + network_mode: str = "host" + privileged: bool = True + + +@dataclass +class AortaEnvironment: + """Environment variables for RCCL/NCCL tuning.""" + NCCL_MAX_NCHANNELS: int = 112 + NCCL_MAX_P2P_NCHANNELS: int = 112 + NCCL_DEBUG: str = "VERSION" + TORCH_NCCL_HIGH_PRIORITY: int = 1 + OMP_NUM_THREADS: int = 1 + RCCL_MSCCL_ENABLE: int = 0 + + def to_dict(self) -> Dict[str, str]: + """Convert to environment dict with computed values.""" + nch = self.NCCL_MAX_NCHANNELS + return { + "NCCL_MAX_NCHANNELS": str(nch), + "NCCL_MAX_P2P_NCHANNELS": str(self.NCCL_MAX_P2P_NCHANNELS), + "TENSILE_STREAMK_MAX_CUS": str(256 - nch), + "NCCL_DEBUG": self.NCCL_DEBUG, + "TORCH_NCCL_HIGH_PRIORITY": str(self.TORCH_NCCL_HIGH_PRIORITY), + "OMP_NUM_THREADS": str(self.OMP_NUM_THREADS), + "RCCL_MSCCL_ENABLE": str(self.RCCL_MSCCL_ENABLE), + } + + +@dataclass +class AortaConfig(RunConfig): + """ + Configuration for Aorta benchmark runner. + + Extends base RunConfig with Aorta-specific settings. + """ + # Path to aorta repository on host (will be bind-mounted) + aorta_path: Path = field(default_factory=lambda: Path("/home/AMD/speriasw/projects/aorta")) + + # Mount point inside container + container_mount_path: str = "/mnt" + + # Aorta base config file (relative to aorta_path) + base_config: str = "config/distributed.yaml" + + # Training config overrides (passed via --override) + training_overrides: Dict[str, Any] = field(default_factory=dict) + + # Docker configuration + docker: AortaDockerConfig = field(default_factory=AortaDockerConfig) + + # RCCL configuration + rccl: RcclConfig = field(default_factory=RcclConfig) + + # Environment configuration + environment: AortaEnvironment = field(default_factory=AortaEnvironment) + + # Scripts to execute (relative to container mount) + build_script: str = "scripts/build_rccl.sh" + experiment_script: str = "scripts/rccl_exp.sh" + + # Number of GPUs per node + gpus_per_node: int = 8 + + # Whether to skip RCCL build (if already built) + skip_rccl_build: bool = False + + +class AortaRunner(BaseRunner): + """ + Runner for Aorta PyTorch distributed benchmarks. + + Uses Docker SDK over SSH to: + 1. Deploy container with GPU access + 2. Build RCCL from source (optional) + 3. Execute distributed training + 4. Collect profiling artifacts + """ + + def __init__(self, config: AortaConfig): + """ + Initialize Aorta runner. + + Args: + config: Aorta benchmark configuration + """ + if not DOCKER_SDK_AVAILABLE: + raise ImportError( + "Docker SDK not available. Install with: pip install docker" + ) + + super().__init__(config) + self.config: AortaConfig = config # Type hint for IDE + + self._docker_clients: Dict[str, docker.DockerClient] = {} + self._containers: Dict[str, Container] = {} + + def validate_config(self) -> List[str]: + """Validate Aorta-specific configuration.""" + errors = super().validate_config() + + if not self.config.aorta_path.exists(): + errors.append(f"Aorta path does not exist: {self.config.aorta_path}") + + config_path = self.config.aorta_path / self.config.base_config + if not config_path.exists(): + errors.append(f"Base config does not exist: {config_path}") + + build_script = self.config.aorta_path / self.config.build_script + if not build_script.exists(): + errors.append(f"Build script does not exist: {build_script}") + + exp_script = self.config.aorta_path / self.config.experiment_script + if not exp_script.exists(): + errors.append(f"Experiment script does not exist: {exp_script}") + + return errors + + def _connect_docker(self, node: str) -> docker.DockerClient: + """ + Connect to Docker daemon on a node via SSH. + + Args: + node: Hostname or IP of the node + + Returns: + Docker client connected to the node + """ + if node in self._docker_clients: + return self._docker_clients[node] + + # Build SSH URL + ssh_url = f"ssh://{self.config.username}@{node}" + log.info(f"Connecting to Docker daemon at {ssh_url}") + + client = docker.DockerClient( + base_url=ssh_url, + use_ssh_client=True, + ) + + # Verify connection + client.ping() + log.info(f"Connected to Docker on {node}") + + self._docker_clients[node] = client + return client + + def _cleanup_existing_containers(self, client: docker.DockerClient, node: str): + """Remove any existing containers with our name.""" + container_name = self.config.docker.container_name + try: + existing = client.containers.get(container_name) + log.info(f"Removing existing container {container_name} on {node}") + existing.stop(timeout=10) + existing.remove(force=True) + except docker.errors.NotFound: + pass # Container doesn't exist, that's fine + except Exception as e: + log.warning(f"Error cleaning up container on {node}: {e}") + + def _launch_container(self, client: docker.DockerClient, node: str) -> Container: + """ + Launch Aorta container on a node. + + Args: + client: Docker client for the node + node: Node hostname + + Returns: + Running container object + """ + cfg = self.config.docker + + # Build volume mounts + volumes = { + str(self.config.aorta_path): { + "bind": self.config.container_mount_path, + "mode": "rw" + } + } + + # Build device list for GPU access + devices = ["/dev/kfd", "/dev/dri"] + + log.info(f"Launching container {cfg.container_name} on {node}") + log.info(f" Image: {cfg.image}") + log.info(f" Mount: {self.config.aorta_path} -> {self.config.container_mount_path}") + + container = client.containers.run( + image=cfg.image, + name=cfg.container_name, + detach=True, + network_mode=cfg.network_mode, + ipc_mode="host", + privileged=cfg.privileged, + shm_size=cfg.shm_size, + volumes=volumes, + devices=devices, + working_dir=self.config.container_mount_path, + group_add=["video"], + cap_add=["SYS_PTRACE"], + security_opt=["seccomp=unconfined"], + ulimits=[ + docker.types.Ulimit(name="memlock", soft=-1, hard=-1), + docker.types.Ulimit(name="stack", soft=67108864, hard=67108864), + ], + stdin_open=True, + tty=True, + command="tail -f /dev/null", # Keep container running + ) + + # Wait for container to be running + container.reload() + if container.status != "running": + raise RuntimeError(f"Container failed to start on {node}: {container.status}") + + log.info(f"Container {cfg.container_name} running on {node} (ID: {container.short_id})") + return container + + def _exec_in_container( + self, + container: Container, + cmd: str, + environment: Optional[Dict[str, str]] = None, + workdir: Optional[str] = None, + stream: bool = False, + ) -> tuple[int, str]: + """ + Execute command inside container. + + Args: + container: Container to execute in + cmd: Command to run + environment: Optional environment variables + workdir: Optional working directory + stream: If True, stream output in real-time (for long-running commands) + + Returns: + Tuple of (exit_code, output) + """ + log.info(f"Executing in container: {cmd[:100]}...") + + if stream: + return self._exec_in_container_streaming(container, cmd, environment, workdir) + + exit_code, output = container.exec_run( + cmd, + environment=environment, + workdir=workdir, + stream=False, + ) + + output_str = output.decode("utf-8") if isinstance(output, bytes) else str(output) + return exit_code, output_str + + def _exec_in_container_streaming( + self, + container: Container, + cmd: str, + environment: Optional[Dict[str, str]] = None, + workdir: Optional[str] = None, + ) -> tuple[int, str]: + """ + Execute command with real-time streaming output. + + Provides feedback during long-running commands like training. + """ + # Use exec_run with stream=True to get real-time output + exec_result = container.client.api.exec_create( + container.id, + cmd, + environment=environment, + workdir=workdir, + stdout=True, + stderr=True, + ) + + output_generator = container.client.api.exec_start( + exec_result['Id'], + stream=True, + demux=True, # Separate stdout and stderr + ) + + output_lines = [] + line_count = 0 + + for stdout_chunk, stderr_chunk in output_generator: + # Process stdout + if stdout_chunk: + text = stdout_chunk.decode('utf-8', errors='replace') + for line in text.splitlines(): + if line.strip(): + line_count += 1 + # Log every line but summarize for very verbose output + if line_count <= 50 or line_count % 20 == 0: + log.info(f" [stdout] {line[:200]}") + output_lines.append(line) + + # Process stderr + if stderr_chunk: + text = stderr_chunk.decode('utf-8', errors='replace') + for line in text.splitlines(): + if line.strip(): + line_count += 1 + # Always log stderr (usually important) + log.info(f" [stderr] {line[:200]}") + output_lines.append(line) + + if line_count > 50: + log.info(f" ... ({line_count} total lines of output)") + + # Get exit code + exec_info = container.client.api.exec_inspect(exec_result['Id']) + exit_code = exec_info.get('ExitCode', -1) + + return exit_code, '\n'.join(output_lines) + + def setup(self) -> bool: + """ + Set up Aorta environment. + + 1. Connect to Docker daemon on each node + 2. Pull image if needed + 3. Launch container with GPU access and aorta bind mount + 4. Build RCCL from source (optional) + """ + try: + for node in self.config.nodes: + # Connect to Docker + client = self._connect_docker(node) + + # Cleanup any existing containers + self._cleanup_existing_containers(client, node) + + # Pull image (will skip if already present) + log.info(f"Pulling image {self.config.docker.image} on {node}...") + try: + client.images.pull(self.config.docker.image) + except docker.errors.ImageNotFound: + log.error(f"Image not found: {self.config.docker.image}") + return False + + # Launch container + container = self._launch_container(client, node) + self._containers[node] = container + + # Build RCCL if not skipping + if not self.config.skip_rccl_build: + log.info(f"Building RCCL on {node}...") + build_cmd = f"bash {self.config.container_mount_path}/{self.config.build_script}" + exit_code, output = self._exec_in_container(container, build_cmd) + + if exit_code != 0: + log.error(f"RCCL build failed on {node}:\n{output}") + return False + + log.info(f"RCCL build completed on {node}") + + return True + + except Exception as e: + log.exception(f"Setup failed: {e}") + return False + + def run(self, **kwargs) -> RunResult: + """ + Execute the Aorta benchmark. + + Runs the experiment script inside the container and collects + profiling artifacts. + """ + start_time = time.time() + stdout_dict: Dict[str, str] = {} + stderr_dict: Dict[str, str] = {} + exit_codes: Dict[str, int] = {} + artifacts: Dict[str, Path] = {} + + try: + # For now, run on head node only (single node v1) + node = self.head_node + container = self._containers.get(node) + + if not container: + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + error_message=f"No container found for {node}" + ) + + # Build environment with computed values + env = self.config.environment.to_dict() + + # Add RCCL library path + rccl_path = self.config.rccl.build_path + env["LD_LIBRARY_PATH"] = ( + f"{rccl_path}/build/release/:/opt/rocm/lib:/opt/rocm/lib64:" + f"/opt/openmpi/lib:/opt/rccl-tests/build:$LD_LIBRARY_PATH" + ) + env["rccl_path"] = rccl_path + + # Build override arguments if any + override_args = "" + if self.config.training_overrides: + for key, value in self.config.training_overrides.items(): + override_args += f' --override {key}="{value}"' + + # Execute experiment script with streaming output for real-time feedback + exp_cmd = f"bash {self.config.container_mount_path}/{self.config.experiment_script}" + log.info(f"Running experiment: {exp_cmd}") + log.info("Streaming output (this may take several minutes)...") + + exit_code, output = self._exec_in_container( + container, + exp_cmd, + environment=env, + stream=True, # Stream output for real-time feedback + ) + + stdout_dict[node] = output + exit_codes[node] = exit_code + + if exit_code != 0: + log.error(f"Experiment failed on {node} with exit code {exit_code}") + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + stdout=stdout_dict, + exit_codes=exit_codes, + error_message=f"Experiment exited with code {exit_code}" + ) + + # Determine output directory from environment + nch = self.config.environment.NCCL_MAX_NCHANNELS + compute_ch = 256 - nch + output_dir_name = f"nodes1_rccl_develop_commsCh{nch}_computeCh{compute_ch}" + + # Collect artifact paths + trace_dir = self.config.aorta_path / output_dir_name / "torch_profiler" + if trace_dir.exists(): + artifacts["torch_traces"] = trace_dir + log.info(f"Found trace artifacts at {trace_dir}") + else: + log.warning(f"Expected trace directory not found: {trace_dir}") + + # Also collect training logs + log_file = self.config.aorta_path / f"training_{node}.log" + if log_file.exists(): + artifacts["training_log"] = log_file + + return RunResult( + status=RunStatus.COMPLETED, + start_time=start_time, + end_time=time.time(), + stdout=stdout_dict, + stderr=stderr_dict, + exit_codes=exit_codes, + artifacts=artifacts, + metadata={ + "nodes": len(self.config.nodes), + "gpus_per_node": self.config.gpus_per_node, + "nccl_channels": nch, + "compute_channels": compute_ch, + } + ) + + except Exception as e: + log.exception(f"Run failed: {e}") + return RunResult( + status=RunStatus.FAILED, + start_time=start_time, + end_time=time.time(), + stdout=stdout_dict, + stderr=stderr_dict, + exit_codes=exit_codes, + error_message=str(e) + ) + + def teardown(self) -> bool: + """ + Cleanup containers and connections. + + Handles SSH connection cleanup gracefully to avoid BrokenPipeError warnings. + """ + import warnings + success = True + + for node, container in self._containers.items(): + try: + log.info(f"Stopping container on {node}...") + container.stop(timeout=30) + container.remove(force=True) + log.info(f"Container removed on {node}") + except Exception as e: + log.warning(f"Error removing container on {node}: {e}") + success = False + + self._containers.clear() + + # Close Docker clients - suppress BrokenPipeError during SSH cleanup + for node, client in self._docker_clients.items(): + try: + # Suppress warnings during cleanup as SSH connections may already be closed + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=BrokenPipeError) + warnings.filterwarnings("ignore", message=".*Broken pipe.*") + try: + client.close() + except BrokenPipeError: + pass # Expected when SSH connection is already closed + except OSError as e: + if "Broken pipe" not in str(e): + raise + except Exception as e: + # Log but don't fail on cleanup errors + log.debug(f"Docker client cleanup for {node}: {e}") + + self._docker_clients.clear() + + return success + From 823606dbe0e1e190819bb1767dcd27b1ccc26886 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:29:12 -0500 Subject: [PATCH 5/7] Aorta and Tracelens parser schemas --- parsers/__init__.py | 51 ++++ parsers/schemas.py | 598 +++++++++++++++++++++++++++++++++++++++++++ parsers/tracelens.py | 461 +++++++++++++++++++++++++++++++++ 3 files changed, 1110 insertions(+) create mode 100644 parsers/__init__.py create mode 100644 parsers/schemas.py create mode 100644 parsers/tracelens.py diff --git a/parsers/__init__.py b/parsers/__init__.py new file mode 100644 index 0000000..72c35d1 --- /dev/null +++ b/parsers/__init__.py @@ -0,0 +1,51 @@ +""" +Parsers module - Layer 2: Data Abstraction & Validation. + +Parsers are responsible for: +- Transforming raw benchmark outputs into structured data +- Validating results against Pydantic schemas +- Aggregating metrics across runs/ranks +- Validating configuration files (fail fast) + +Parsers should NOT: +- Execute benchmarks +- Deploy infrastructure +- Make pass/fail decisions (validation only) +""" + +from parsers.schemas import ( + # Result schemas + AortaTraceMetrics, + AortaBenchmarkResult, + ParseResult, + ParseStatus, + # Config file schemas + ClusterConfigFile, + ClusterNodeConfig, + AortaBenchmarkConfigFile, + AortaDockerConfigFile, + AortaRcclConfigFile, + AortaEnvironmentConfigFile, + AortaExpectedResultsConfigFile, + # Validation helper + validate_config_file, +) + +__all__ = [ + # Result schemas + "AortaTraceMetrics", + "AortaBenchmarkResult", + "ParseResult", + "ParseStatus", + # Config file schemas + "ClusterConfigFile", + "ClusterNodeConfig", + "AortaBenchmarkConfigFile", + "AortaDockerConfigFile", + "AortaRcclConfigFile", + "AortaEnvironmentConfigFile", + "AortaExpectedResultsConfigFile", + # Validation helper + "validate_config_file", +] + diff --git a/parsers/schemas.py b/parsers/schemas.py new file mode 100644 index 0000000..c2d13c9 --- /dev/null +++ b/parsers/schemas.py @@ -0,0 +1,598 @@ +""" +Pydantic schemas for ALL benchmark results AND configuration files. + +This is the single source of truth for: +- Result data structures (parsed benchmark output) +- Configuration file schemas (validated before running benchmarks) + +All parsers produce instances of these models. +Config validation happens early to fail fast with clear errors. + +Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. +""" + +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Dict, Generic, List, Optional, TypeVar, Union +import math + +from pydantic import BaseModel, Field, field_validator, model_validator, ConfigDict + + +# ============================================================================= +# Common Types +# ============================================================================= + +class ParseStatus(Enum): + """Status of a parse operation.""" + SUCCESS = "success" + PARTIAL = "partial" # Some results parsed, some failed + FAILED = "failed" + + +T = TypeVar('T', bound=BaseModel) + + +@dataclass +class ParseResult(Generic[T]): + """ + Generic result container for all parsers. + + Contains validated Pydantic models plus any warnings/errors. + """ + status: ParseStatus + results: List[T] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def succeeded(self) -> bool: + return self.status == ParseStatus.SUCCESS + + @property + def has_results(self) -> bool: + return len(self.results) > 0 + + +# ============================================================================= +# Aorta / TraceLens Schemas +# ============================================================================= + +class AortaTraceMetrics(BaseModel): + """ + Per-rank metrics extracted from PyTorch profiler traces. + + Represents a single GPU's performance during distributed training. + """ + model_config = ConfigDict(frozen=True) + + # Identification + rank: int = Field(ge=0, description="Global rank ID") + node: Optional[str] = Field(default=None, description="Node hostname") + local_rank: Optional[int] = Field(default=None, ge=0, description="Local rank on node") + + # Timing metrics (in microseconds for precision) + total_time_us: float = Field(ge=0, description="Total iteration time") + compute_time_us: float = Field(ge=0, description="Time spent in compute kernels") + communication_time_us: float = Field(ge=0, description="Time spent in NCCL/communication") + memory_time_us: Optional[float] = Field(default=None, ge=0, description="Time in memory operations") + idle_time_us: Optional[float] = Field(default=None, ge=0, description="Idle/wait time") + + # Memory metrics + peak_memory_gb: Optional[float] = Field(default=None, ge=0, description="Peak GPU memory usage") + allocated_memory_gb: Optional[float] = Field(default=None, ge=0, description="Allocated GPU memory") + + # Kernel counts + compute_kernel_count: Optional[int] = Field(default=None, ge=0, description="Number of compute kernels") + comm_kernel_count: Optional[int] = Field(default=None, ge=0, description="Number of NCCL kernels") + + @field_validator('total_time_us', 'compute_time_us', 'communication_time_us') + @classmethod + def validate_not_nan(cls, v: float, info) -> float: + """Ensure timing values are not NaN or Inf.""" + if math.isnan(v) or math.isinf(v): + raise ValueError(f'{info.field_name} cannot be NaN or Inf') + return v + + @property + def compute_ratio(self) -> float: + """Fraction of time spent in compute (vs communication).""" + if self.total_time_us > 0: + return self.compute_time_us / self.total_time_us + return 0.0 + + @property + def comm_ratio(self) -> float: + """Fraction of time spent in communication.""" + if self.total_time_us > 0: + return self.communication_time_us / self.total_time_us + return 0.0 + + @property + def compute_comm_overlap(self) -> float: + """ + Estimated compute-communication overlap. + + If compute + comm > total, there's overlap. + Returns fraction of comm time that overlaps with compute. + """ + if self.communication_time_us <= 0: + return 0.0 + + overlap_time = (self.compute_time_us + self.communication_time_us) - self.total_time_us + overlap_time = max(0, overlap_time) # Can't have negative overlap + + return overlap_time / self.communication_time_us + + +class AortaBenchmarkResult(BaseModel): + """ + Aggregated Aorta benchmark results across all ranks. + + Computed from individual AortaTraceMetrics. + """ + model_config = ConfigDict(frozen=True) + + # Cluster configuration + num_nodes: int = Field(gt=0, description="Number of nodes") + gpus_per_node: int = Field(gt=0, description="GPUs per node") + total_gpus: int = Field(gt=0, description="Total GPU count") + + # Aggregated timing (mean across ranks, in microseconds) + avg_iteration_time_us: float = Field(ge=0, description="Mean iteration time") + std_iteration_time_us: float = Field(ge=0, description="Std dev of iteration time") + min_iteration_time_us: float = Field(ge=0, description="Minimum iteration time") + max_iteration_time_us: float = Field(ge=0, description="Maximum iteration time") + + # Aggregated ratios + avg_compute_ratio: float = Field(ge=0, le=1, description="Mean compute ratio") + avg_comm_ratio: float = Field(ge=0, le=1, description="Mean communication ratio") + avg_overlap_ratio: float = Field(ge=0, le=1, description="Mean overlap ratio") + + # Throughput (if available) + samples_per_second: Optional[float] = Field(default=None, ge=0) + tokens_per_second: Optional[float] = Field(default=None, ge=0) + + # Per-rank metrics + per_rank_metrics: List[AortaTraceMetrics] = Field(default_factory=list) + + # Metadata + nccl_channels: Optional[int] = Field(default=None) + compute_channels: Optional[int] = Field(default=None) + rccl_branch: Optional[str] = Field(default=None) + + @property + def avg_iteration_time_ms(self) -> float: + """Mean iteration time in milliseconds.""" + return self.avg_iteration_time_us / 1000.0 + + @classmethod + def from_rank_metrics( + cls, + metrics: List[AortaTraceMetrics], + num_nodes: int, + gpus_per_node: int, + **kwargs + ) -> "AortaBenchmarkResult": + """ + Aggregate individual rank metrics into a benchmark result. + + Args: + metrics: List of per-rank metrics + num_nodes: Number of nodes in cluster + gpus_per_node: GPUs per node + **kwargs: Additional metadata fields + + Returns: + Aggregated benchmark result + """ + if not metrics: + raise ValueError("Cannot aggregate empty metrics list") + + import statistics + + times = [m.total_time_us for m in metrics] + compute_ratios = [m.compute_ratio for m in metrics] + comm_ratios = [m.comm_ratio for m in metrics] + overlap_ratios = [m.compute_comm_overlap for m in metrics] + + return cls( + num_nodes=num_nodes, + gpus_per_node=gpus_per_node, + total_gpus=num_nodes * gpus_per_node, + avg_iteration_time_us=statistics.mean(times), + std_iteration_time_us=statistics.stdev(times) if len(times) > 1 else 0.0, + min_iteration_time_us=min(times), + max_iteration_time_us=max(times), + avg_compute_ratio=statistics.mean(compute_ratios), + avg_comm_ratio=statistics.mean(comm_ratios), + avg_overlap_ratio=statistics.mean(overlap_ratios), + per_rank_metrics=metrics, + **kwargs + ) + + +# ============================================================================= +# RCCL Schemas (for future use - mirrors existing models/rccl.py patterns) +# ============================================================================= + +# Note: RCCL schemas already exist in models/rccl.py +# When porting RCCL tests to this architecture, we can either: +# 1. Move those schemas here +# 2. Re-export them from here +# 3. Keep them separate and import as needed + + +# ============================================================================= +# Configuration File Schemas (Input Validation - Fail Fast) +# ============================================================================= + +class ClusterNodeConfig(BaseModel): + """Schema for a single node entry in cluster.json node_dict.""" + model_config = ConfigDict(extra="allow") # Allow extra fields like bmc_ip + + vpc_ip: str = Field(description="VPC IP or hostname for inter-node communication") + bmc_ip: Optional[str] = Field(default=None, description="BMC IP for out-of-band management") + + +class HeadNodeConfig(BaseModel): + """Schema for head_node_dict in cluster.json.""" + model_config = ConfigDict(extra="allow") + + mgmt_ip: str = Field(description="Management IP of head node") + + +class ClusterConfigFile(BaseModel): + """ + Schema for cluster.json configuration file. + + Validates the cluster configuration before running benchmarks. + Fails fast with clear error messages if required fields are missing. + """ + model_config = ConfigDict(extra="allow") + + username: str = Field(description="SSH username for cluster nodes") + priv_key_file: Optional[str] = Field(default=None, description="Path to SSH private key") + password: Optional[str] = Field(default=None, description="SSH password (if not using key)") + + node_dict: Dict[str, ClusterNodeConfig] = Field( + description="Dictionary mapping node hostname/IP to node configuration" + ) + head_node_dict: Optional[HeadNodeConfig] = Field( + default=None, + description="Head node configuration" + ) + + # Optional fields that may be present + home_mount_dir_name: Optional[str] = Field(default="home") + node_dir_name: Optional[str] = Field(default="root") + + @model_validator(mode='after') + def validate_auth_method(self): + """Ensure at least one authentication method is provided.""" + if not self.priv_key_file and not self.password: + raise ValueError( + "Authentication required: provide either 'priv_key_file' or 'password' in cluster config" + ) + return self + + @model_validator(mode='after') + def validate_nodes_exist(self): + """Ensure at least one node is configured.""" + if not self.node_dict: + raise ValueError("No nodes configured in 'node_dict' - at least one node is required") + return self + + @field_validator('username') + @classmethod + def validate_username_not_placeholder(cls, v: str) -> str: + """Check that username is not still a placeholder.""" + if '' in v.lower(): + raise ValueError( + "Username contains placeholder ''. " + "Please set a valid username in cluster config." + ) + return v + + +class AortaDockerConfigFile(BaseModel): + """Schema for docker section in aorta_benchmark.yaml.""" + model_config = ConfigDict(extra="forbid") # Catch typos + + image: str = Field( + default="jeffdaily/pytorch:torchrec-dlrm-complete", + description="Docker image for Aorta container" + ) + container_name: str = Field( + default="aorta-benchmark", + description="Name for the Docker container" + ) + shm_size: str = Field( + default="17G", + description="Shared memory size" + ) + network_mode: str = Field( + default="host", + description="Docker network mode" + ) + privileged: bool = Field( + default=True, + description="Run container in privileged mode" + ) + + +class AortaRcclConfigFile(BaseModel): + """Schema for rccl section in aorta_benchmark.yaml.""" + model_config = ConfigDict(extra="forbid") + + clone_url: str = Field( + default="https://github.com/ROCmSoftwarePlatform/rccl.git", + description="RCCL git repository URL" + ) + branch: str = Field( + default="develop", + description="RCCL branch to build" + ) + build_path: str = Field( + default="/mnt/rccl", + description="Path inside container for RCCL build" + ) + + +class AortaEnvironmentConfigFile(BaseModel): + """Schema for environment section in aorta_benchmark.yaml.""" + model_config = ConfigDict(extra="allow") # Allow custom env vars + + NCCL_MAX_NCHANNELS: int = Field( + default=112, + ge=1, + le=256, + description="Maximum NCCL channels" + ) + NCCL_MAX_P2P_NCHANNELS: int = Field( + default=112, + ge=1, + le=256, + description="Maximum NCCL P2P channels" + ) + NCCL_DEBUG: str = Field( + default="VERSION", + description="NCCL debug level" + ) + TORCH_NCCL_HIGH_PRIORITY: int = Field( + default=1, + ge=0, + le=1, + description="Enable high priority NCCL streams" + ) + OMP_NUM_THREADS: int = Field( + default=1, + ge=1, + description="OpenMP thread count" + ) + RCCL_MSCCL_ENABLE: int = Field( + default=0, + ge=0, + le=1, + description="Enable MSCCL" + ) + + +class AortaExpectedResultsConfigFile(BaseModel): + """Schema for expected_results section in aorta_benchmark.yaml.""" + model_config = ConfigDict(extra="allow") # Allow custom thresholds + + max_avg_iteration_ms: Optional[float] = Field( + default=None, + ge=0, + description="Maximum acceptable average iteration time in ms" + ) + min_compute_ratio: Optional[float] = Field( + default=None, + ge=0, + le=1, + description="Minimum acceptable compute ratio" + ) + min_overlap_ratio: Optional[float] = Field( + default=None, + ge=0, + le=1, + description="Minimum acceptable compute-comm overlap ratio" + ) + max_time_variance_ratio: Optional[float] = Field( + default=None, + ge=0, + description="Maximum acceptable iteration time variance" + ) + + +class AortaBenchmarkConfigFile(BaseModel): + """ + Schema for the entire aorta_benchmark.yaml configuration file. + + Validates structure and provides sensible defaults. + Fails fast with clear error messages if configuration is invalid. + + Usage: + with open("aorta_benchmark.yaml") as f: + raw = yaml.safe_load(f) + config = AortaBenchmarkConfigFile.model_validate(raw) + """ + model_config = ConfigDict(extra="forbid") # Catch typos in top-level keys + + # Required: Path to aorta repository + aorta_path: str = Field( + description="Path to Aorta repository on host (will be bind-mounted)" + ) + + # Container settings + container_mount_path: str = Field( + default="/mnt", + description="Mount point inside container for aorta_path" + ) + + # Aorta config + base_config: str = Field( + default="config/distributed.yaml", + description="Aorta config file relative to aorta_path" + ) + + # Nested configuration sections + docker: AortaDockerConfigFile = Field( + default_factory=AortaDockerConfigFile, + description="Docker container configuration" + ) + rccl: AortaRcclConfigFile = Field( + default_factory=AortaRcclConfigFile, + description="RCCL build configuration" + ) + environment: AortaEnvironmentConfigFile = Field( + default_factory=AortaEnvironmentConfigFile, + description="Environment variables for RCCL/NCCL" + ) + + # Training overrides + training_overrides: Dict[str, Any] = Field( + default_factory=dict, + description="Overrides passed to Aorta via --override flag" + ) + + # Scripts + build_script: str = Field( + default="scripts/build_rccl.sh", + description="RCCL build script relative to container mount" + ) + experiment_script: str = Field( + default="scripts/rccl_exp.sh", + description="Experiment script relative to container mount" + ) + + # Hardware + gpus_per_node: int = Field( + default=8, + ge=1, + description="Number of GPUs per node" + ) + + # Execution settings + timeout_seconds: int = Field( + default=10800, + ge=60, + description="Benchmark timeout in seconds" + ) + skip_rccl_build: bool = Field( + default=False, + description="Skip RCCL build if already built" + ) + + # Validation thresholds + expected_results: AortaExpectedResultsConfigFile = Field( + default_factory=AortaExpectedResultsConfigFile, + description="Expected results for validation" + ) + + @field_validator('aorta_path') + @classmethod + def validate_aorta_path_not_placeholder(cls, v: str) -> str: + """Check that aorta_path is not a placeholder.""" + if '' in v.lower(): + raise ValueError( + "aorta_path contains placeholder ''. " + "Please set the actual path to your Aorta installation." + ) + return v + + def validate_paths_exist(self) -> List[str]: + """ + Validate that referenced paths exist on the filesystem. + + Call this after loading config to check paths. + Returns list of error messages (empty if all valid). + """ + errors = [] + + aorta = Path(self.aorta_path) + if not aorta.exists(): + errors.append(f"aorta_path does not exist: {self.aorta_path}") + else: + # Check internal paths + base_cfg = aorta / self.base_config + if not base_cfg.exists(): + errors.append(f"base_config does not exist: {base_cfg}") + + build_script = aorta / self.build_script + if not build_script.exists(): + errors.append(f"build_script does not exist: {build_script}") + + exp_script = aorta / self.experiment_script + if not exp_script.exists(): + errors.append(f"experiment_script does not exist: {exp_script}") + + return errors + + +def validate_config_file( + config_path: Union[str, Path], + config_type: str = "auto" +) -> Union[AortaBenchmarkConfigFile, ClusterConfigFile]: + """ + Load and validate a configuration file. + + Args: + config_path: Path to configuration file (YAML or JSON) + config_type: Type of config - "aorta", "cluster", or "auto" (detect from content) + + Returns: + Validated Pydantic model + + Raises: + ValueError: If config is invalid with detailed error message + FileNotFoundError: If config file doesn't exist + """ + import json + import yaml + + config_path = Path(config_path) + + if not config_path.exists(): + raise FileNotFoundError(f"Configuration file not found: {config_path}") + + # Load file + with open(config_path) as f: + if config_path.suffix in ('.yaml', '.yml'): + raw_config = yaml.safe_load(f) + else: + raw_config = json.load(f) + + if raw_config is None: + raise ValueError(f"Configuration file is empty: {config_path}") + + # Determine config type + if config_type == "auto": + if "node_dict" in raw_config: + config_type = "cluster" + elif "aorta_path" in raw_config: + config_type = "aorta" + else: + raise ValueError( + f"Cannot auto-detect config type for {config_path}. " + f"Specify config_type='aorta' or config_type='cluster'" + ) + + # Validate with appropriate schema + try: + if config_type == "cluster": + return ClusterConfigFile.model_validate(raw_config) + elif config_type == "aorta": + return AortaBenchmarkConfigFile.model_validate(raw_config) + else: + raise ValueError(f"Unknown config_type: {config_type}") + except Exception as e: + # Re-raise with file context + raise ValueError( + f"Invalid configuration in {config_path}:\n{e}" + ) from e + diff --git a/parsers/tracelens.py b/parsers/tracelens.py new file mode 100644 index 0000000..b81fafa --- /dev/null +++ b/parsers/tracelens.py @@ -0,0 +1,461 @@ +""" +TraceLens parser for PyTorch profiler traces. + +Parses PyTorch profiler JSON traces and extracts performance metrics +using TraceLens analysis. + +Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. +""" + +import json +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +from parsers.schemas import ( + AortaTraceMetrics, + AortaBenchmarkResult, + ParseResult, + ParseStatus, +) + +# Import runners for type hints only +from runners._base_runner import RunResult + +log = logging.getLogger(__name__) + +# Try to import TraceLens - it's optional for basic parsing +try: + from TraceLens import generate_perf_report_pytorch + TRACELENS_AVAILABLE = True +except ImportError: + TRACELENS_AVAILABLE = False + generate_perf_report_pytorch = None # type: ignore + + +class TraceLensParser: + """ + Parser for PyTorch profiler traces. + + Can use TraceLens for detailed analysis or fall back to basic JSON parsing. + """ + + def __init__(self, use_tracelens: bool = True): + """ + Initialize parser. + + Args: + use_tracelens: Whether to use TraceLens for analysis (if available) + """ + self.use_tracelens = use_tracelens and TRACELENS_AVAILABLE + + if use_tracelens and not TRACELENS_AVAILABLE: + log.warning("TraceLens not available, falling back to basic parsing") + + def parse(self, run_result: RunResult) -> ParseResult[AortaTraceMetrics]: + """ + Parse benchmark results into validated metrics. + + Args: + run_result: Result from AortaRunner + + Returns: + ParseResult containing validated AortaTraceMetrics + """ + if not run_result.succeeded: + return ParseResult( + status=ParseStatus.FAILED, + errors=[f"Run did not succeed: {run_result.error_message}"] + ) + + trace_dir = run_result.get_artifact("torch_traces") + if not trace_dir: + return ParseResult( + status=ParseStatus.FAILED, + errors=["No torch_traces artifact found in run result"] + ) + + if not trace_dir.exists(): + return ParseResult( + status=ParseStatus.FAILED, + errors=[f"Trace directory does not exist: {trace_dir}"] + ) + + return self.parse_trace_directory(trace_dir) + + def parse_trace_directory(self, trace_dir: Path) -> ParseResult[AortaTraceMetrics]: + """ + Parse all trace files in a directory. + + Looks for PyTorch profiler JSON traces and extracts metrics. + + Args: + trace_dir: Directory containing trace files + + Returns: + ParseResult with metrics for each rank + """ + results: List[AortaTraceMetrics] = [] + warnings: List[str] = [] + errors: List[str] = [] + + # Find all trace JSON files + # PyTorch profiler typically outputs: rank0/trace.json, rank1/trace.json, etc. + # Or: worker0_trace.json, worker1_trace.json + trace_files = list(trace_dir.glob("**/trace*.json")) + + if not trace_files: + # Try alternative patterns + trace_files = list(trace_dir.glob("**/*.json")) + + if not trace_files: + return ParseResult( + status=ParseStatus.FAILED, + errors=[f"No trace files found in {trace_dir}"] + ) + + log.info(f"Found {len(trace_files)} trace files to parse") + + for trace_file in trace_files: + try: + rank = self._extract_rank_from_path(trace_file) + + if self.use_tracelens: + metrics = self._parse_with_tracelens(trace_file, rank) + else: + metrics = self._parse_basic(trace_file, rank) + + results.append(metrics) + log.debug(f"Parsed rank {rank}: {metrics.total_time_us:.2f}us total") + + except Exception as e: + warnings.append(f"Failed to parse {trace_file}: {e}") + log.warning(f"Failed to parse {trace_file}: {e}") + + # Determine status + if not results: + status = ParseStatus.FAILED + errors.append("No traces could be parsed") + elif warnings: + status = ParseStatus.PARTIAL + else: + status = ParseStatus.SUCCESS + + return ParseResult( + status=status, + results=results, + warnings=warnings, + errors=errors, + metadata={ + "trace_dir": str(trace_dir), + "files_found": len(trace_files), + "files_parsed": len(results), + } + ) + + def _extract_rank_from_path(self, trace_file: Path) -> int: + """ + Extract rank ID from trace file path. + + Handles patterns like: + - rank0/trace.json -> 0 + - rank_0/trace.json -> 0 + - worker0_trace.json -> 0 + - trace_rank0.json -> 0 + """ + path_str = str(trace_file) + + # Try common patterns + import re + + patterns = [ + r'rank[_]?(\d+)', + r'worker[_]?(\d+)', + r'gpu[_]?(\d+)', + ] + + for pattern in patterns: + match = re.search(pattern, path_str, re.IGNORECASE) + if match: + return int(match.group(1)) + + # Default to 0 if no pattern matched + log.warning(f"Could not extract rank from {trace_file}, defaulting to 0") + return 0 + + def _parse_with_tracelens(self, trace_file: Path, rank: int) -> AortaTraceMetrics: + """ + Parse trace using TraceLens for detailed analysis. + + Args: + trace_file: Path to trace JSON file + rank: Rank ID + + Returns: + Parsed metrics + """ + log.info(f"Parsing with TraceLens: {trace_file}") + + try: + import tempfile + import os + + # TraceLens needs a writable output path for its xlsx report + # Use a temp file to avoid permission issues + with tempfile.TemporaryDirectory() as tmpdir: + output_xlsx = os.path.join(tmpdir, f"trace_rank{rank}_report.xlsx") + + # Run TraceLens - it returns Dict[str, DataFrame] directly + result_dfs = generate_perf_report_pytorch( + profile_json_path=str(trace_file), + output_xlsx_path=output_xlsx, + ) + + log.debug(f"TraceLens returned dataframes: {list(result_dfs.keys())}") + + # Extract metrics from TraceLens output + total_time_us = 0.0 + compute_time_us = 0.0 + comm_time_us = 0.0 + + # TraceLens returns gpu_timeline with key metrics + if 'gpu_timeline' in result_dfs: + df = result_dfs['gpu_timeline'] + log.debug(f"gpu_timeline data:\n{df.to_string()}") + + # Create a lookup dict from type -> time ms + timeline = dict(zip(df['type'], df['time ms'])) + + # Extract metrics (TraceLens returns in ms, convert to us) + # total_time is the full trace duration + total_time_ms = timeline.get('total_time', 0) + compute_time_ms = timeline.get('computation_time', 0) + # exposed_comm_time is communication NOT overlapped with compute + exposed_comm_ms = timeline.get('exposed_comm_time', 0) + # total_comm_time includes overlapped communication + total_comm_ms = timeline.get('total_comm_time', 0) + + # Convert to microseconds + total_time_us = total_time_ms * 1000 + compute_time_us = compute_time_ms * 1000 + # Use exposed comm time for non-overlapped communication cost + comm_time_us = exposed_comm_ms * 1000 + + log.info(f"TraceLens gpu_timeline for rank {rank}:") + log.info(f" Total time: {total_time_ms:.2f}ms") + log.info(f" Compute time: {compute_time_ms:.2f}ms ({100*compute_time_ms/total_time_ms:.1f}%)") + log.info(f" Exposed comm: {exposed_comm_ms:.2f}ms ({100*exposed_comm_ms/total_time_ms:.1f}%)") + log.info(f" Total comm: {total_comm_ms:.2f}ms (overlap: {100*(total_comm_ms-exposed_comm_ms)/total_comm_ms:.1f}%)") + + if total_time_us > 0: + log.info(f"TraceLens metrics for rank {rank}: total={total_time_us:.2f}us, compute={compute_time_us:.2f}us, comm={comm_time_us:.2f}us") + return AortaTraceMetrics( + rank=rank, + total_time_us=float(total_time_us), + compute_time_us=float(compute_time_us), + communication_time_us=float(comm_time_us), + ) + else: + log.warning(f"TraceLens returned no usable metrics, falling back to basic parsing") + return self._parse_basic(trace_file, rank) + + except Exception as e: + log.warning(f"TraceLens parsing failed: {e}, falling back to basic parsing") + import traceback + log.debug(traceback.format_exc()) + return self._parse_basic(trace_file, rank) + + def _parse_basic(self, trace_file: Path, rank: int) -> AortaTraceMetrics: + """ + Basic parsing of PyTorch profiler JSON without TraceLens. + + Extracts timing information directly from the trace events. + + Args: + trace_file: Path to trace JSON file + rank: Rank ID + + Returns: + Parsed metrics + """ + with open(trace_file, 'r') as f: + trace_data = json.load(f) + + # PyTorch profiler output structure varies by version + # Common structure: {"traceEvents": [...], ...} + events = trace_data.get("traceEvents", []) + + if not events: + # Try alternative structure + events = trace_data if isinstance(trace_data, list) else [] + + # Categorize events by type + compute_time_us = 0.0 + comm_time_us = 0.0 + total_time_us = 0.0 + memory_time_us = 0.0 + + compute_kernels = 0 + comm_kernels = 0 + + peak_memory = 0.0 + + for event in events: + if not isinstance(event, dict): + continue + + name = event.get("name", "") + cat = event.get("cat", "") + dur = event.get("dur", 0) # Duration in microseconds + + # Skip instant events (dur = 0 or missing) + if dur <= 0: + continue + + # Categorize by event name/category + name_lower = name.lower() + cat_lower = cat.lower() + + # NCCL/communication events + if any(k in name_lower for k in ["nccl", "allreduce", "allgather", "broadcast", "reduce_scatter"]): + comm_time_us += dur + comm_kernels += 1 + + # Memory events + elif any(k in name_lower for k in ["memcpy", "memset", "cudamemcpy"]): + memory_time_us += dur + + # Compute kernels (HIP/CUDA kernels) + elif cat_lower == "kernel" or "kernel" in name_lower: + compute_time_us += dur + compute_kernels += 1 + + # GPU activity + elif cat_lower == "gpu": + compute_time_us += dur + + # Track total + total_time_us += dur + + # Memory tracking + if "args" in event: + args = event["args"] + if "Total Allocated" in args: + try: + mem_bytes = float(args["Total Allocated"]) + peak_memory = max(peak_memory, mem_bytes / (1024**3)) # Convert to GB + except (ValueError, TypeError): + pass + + # If we couldn't categorize, estimate from total + if total_time_us == 0: + # Fallback: use trace span + if events: + start_times = [e.get("ts", 0) for e in events if isinstance(e, dict) and "ts" in e] + end_times = [e.get("ts", 0) + e.get("dur", 0) for e in events if isinstance(e, dict)] + if start_times and end_times: + total_time_us = max(end_times) - min(start_times) + + return AortaTraceMetrics( + rank=rank, + total_time_us=total_time_us, + compute_time_us=compute_time_us, + communication_time_us=comm_time_us, + memory_time_us=memory_time_us if memory_time_us > 0 else None, + peak_memory_gb=peak_memory if peak_memory > 0 else None, + compute_kernel_count=compute_kernels if compute_kernels > 0 else None, + comm_kernel_count=comm_kernels if comm_kernels > 0 else None, + ) + + def aggregate( + self, + parse_result: ParseResult[AortaTraceMetrics], + num_nodes: int, + gpus_per_node: int, + **metadata + ) -> Optional[AortaBenchmarkResult]: + """ + Aggregate per-rank metrics into a benchmark result. + + Args: + parse_result: Parsed per-rank metrics + num_nodes: Number of nodes in cluster + gpus_per_node: GPUs per node + **metadata: Additional metadata fields + + Returns: + Aggregated benchmark result, or None if aggregation failed + """ + if not parse_result.has_results: + log.error("Cannot aggregate: no results to aggregate") + return None + + try: + return AortaBenchmarkResult.from_rank_metrics( + metrics=parse_result.results, + num_nodes=num_nodes, + gpus_per_node=gpus_per_node, + **metadata + ) + except Exception as e: + log.exception(f"Aggregation failed: {e}") + return None + + def validate_thresholds( + self, + result: AortaBenchmarkResult, + expected: Dict[str, Any] + ) -> List[str]: + """ + Validate benchmark results against expected thresholds. + + Args: + result: Aggregated benchmark result + expected: Dictionary of threshold configurations + + Returns: + List of validation failure messages (empty if all pass) + """ + failures = [] + + # Check iteration time + max_iteration_ms = expected.get("max_avg_iteration_ms") + if max_iteration_ms is not None: + if result.avg_iteration_time_ms > max_iteration_ms: + failures.append( + f"Average iteration time {result.avg_iteration_time_ms:.2f}ms " + f"exceeds threshold {max_iteration_ms}ms" + ) + + # Check compute ratio + min_compute_ratio = expected.get("min_compute_ratio") + if min_compute_ratio is not None: + if result.avg_compute_ratio < min_compute_ratio: + failures.append( + f"Average compute ratio {result.avg_compute_ratio:.2%} " + f"below threshold {min_compute_ratio:.2%}" + ) + + # Check overlap ratio + min_overlap_ratio = expected.get("min_overlap_ratio") + if min_overlap_ratio is not None: + if result.avg_overlap_ratio < min_overlap_ratio: + failures.append( + f"Average overlap ratio {result.avg_overlap_ratio:.2%} " + f"below threshold {min_overlap_ratio:.2%}" + ) + + # Check per-rank variance (iteration time skew) + max_time_variance = expected.get("max_time_variance_ratio") + if max_time_variance is not None: + if result.avg_iteration_time_us > 0: + variance_ratio = result.std_iteration_time_us / result.avg_iteration_time_us + if variance_ratio > max_time_variance: + failures.append( + f"Iteration time variance {variance_ratio:.2%} " + f"exceeds threshold {max_time_variance:.2%}" + ) + + return failures + From 31e8c87e56695930379e5b7a8b3f59f008446c44 Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:29:48 -0500 Subject: [PATCH 6/7] Validate input config, run aorta benchmarks, validate results, create report --- tests/benchmark/__init__.py | 4 + tests/benchmark/test_aorta.py | 337 ++++++++++++++++++++++++++++++++++ 2 files changed, 341 insertions(+) create mode 100644 tests/benchmark/__init__.py create mode 100644 tests/benchmark/test_aorta.py diff --git a/tests/benchmark/__init__.py b/tests/benchmark/__init__.py new file mode 100644 index 0000000..5bebc80 --- /dev/null +++ b/tests/benchmark/__init__.py @@ -0,0 +1,4 @@ +""" +Benchmark tests using the runner-parser architecture. +""" + diff --git a/tests/benchmark/test_aorta.py b/tests/benchmark/test_aorta.py new file mode 100644 index 0000000..6e3c14d --- /dev/null +++ b/tests/benchmark/test_aorta.py @@ -0,0 +1,337 @@ +""" +Pytest tests for Aorta benchmark using runner-parser architecture. + +Usage: + pytest tests/benchmark/test_aorta.py \ + --cluster_file input/cluster_file/cluster.json \ + --config_file input/aorta_benchmark.yaml + +Copyright 2025 Advanced Micro Devices, Inc. +All rights reserved. +""" + +import json +import logging +import sys +from pathlib import Path + +import pytest + +# Add lib and root to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) +sys.path.insert(0, str(Path(__file__).parent.parent.parent / 'lib')) + +from runners.aorta import AortaRunner, AortaConfig, AortaDockerConfig, RcclConfig, AortaEnvironment +from runners._base_runner import RunStatus +from parsers.tracelens import TraceLensParser +from parsers.schemas import ( + ParseStatus, + # Config validation schemas + ClusterConfigFile, + AortaBenchmarkConfigFile, + validate_config_file, +) + +import globals +from utils_lib import fail_test, update_test_result + +log = logging.getLogger(__name__) + + +# ============================================================================= +# Fixtures - Config Loading with Early Validation (Fail Fast) +# ============================================================================= + +@pytest.fixture(scope="module") +def cluster_file(pytestconfig): + """Path to cluster configuration file.""" + return pytestconfig.getoption("cluster_file") + + +@pytest.fixture(scope="module") +def config_file(pytestconfig): + """Path to benchmark configuration file.""" + return pytestconfig.getoption("config_file") + + +@pytest.fixture(scope="module") +def validated_cluster_config(cluster_file) -> ClusterConfigFile: + """ + Load and validate cluster configuration. + + Fails fast with clear error messages if config is invalid. + """ + log.info(f"Validating cluster config: {cluster_file}") + + try: + config = validate_config_file(cluster_file, config_type="cluster") + log.info(f"Cluster config valid: {len(config.node_dict)} nodes configured") + return config + except Exception as e: + pytest.fail(f"CLUSTER CONFIG VALIDATION FAILED:\n{e}") + + +@pytest.fixture(scope="module") +def validated_aorta_config(config_file) -> AortaBenchmarkConfigFile: + """ + Load and validate Aorta benchmark configuration. + + Fails fast with clear error messages if config is invalid. + """ + log.info(f"Validating Aorta config: {config_file}") + + try: + config = validate_config_file(config_file, config_type="aorta") + log.info(f"Aorta config valid: image={config.docker.image}") + + # Additional path validation + path_errors = config.validate_paths_exist() + if path_errors: + error_msg = "Path validation failed:\n" + "\n".join(f" - {e}" for e in path_errors) + pytest.fail(f"AORTA CONFIG PATH VALIDATION FAILED:\n{error_msg}") + + return config + except Exception as e: + pytest.fail(f"AORTA CONFIG VALIDATION FAILED:\n{e}") + + +@pytest.fixture(scope="module") +def aorta_runner_config( + validated_cluster_config: ClusterConfigFile, + validated_aorta_config: AortaBenchmarkConfigFile +) -> AortaConfig: + """ + Build AortaConfig from validated cluster and aorta configs. + + This bridges the validated Pydantic models to the runner's dataclass config. + """ + # Extract node list from validated cluster config + node_list = list(validated_cluster_config.node_dict.keys()) + + # Build Docker config from validated aorta config + docker_config = AortaDockerConfig( + image=validated_aorta_config.docker.image, + container_name=validated_aorta_config.docker.container_name, + shm_size=validated_aorta_config.docker.shm_size, + network_mode=validated_aorta_config.docker.network_mode, + privileged=validated_aorta_config.docker.privileged, + ) + + # Build RCCL config + rccl_config = RcclConfig( + clone_url=validated_aorta_config.rccl.clone_url, + branch=validated_aorta_config.rccl.branch, + build_path=validated_aorta_config.rccl.build_path, + ) + + # Build environment config + env = validated_aorta_config.environment + env_config = AortaEnvironment( + NCCL_MAX_NCHANNELS=env.NCCL_MAX_NCHANNELS, + NCCL_MAX_P2P_NCHANNELS=env.NCCL_MAX_P2P_NCHANNELS, + NCCL_DEBUG=env.NCCL_DEBUG, + TORCH_NCCL_HIGH_PRIORITY=env.TORCH_NCCL_HIGH_PRIORITY, + OMP_NUM_THREADS=env.OMP_NUM_THREADS, + RCCL_MSCCL_ENABLE=env.RCCL_MSCCL_ENABLE, + ) + + # Build full runner config + return AortaConfig( + nodes=node_list, + username=validated_cluster_config.username, + pkey=validated_cluster_config.priv_key_file, + aorta_path=Path(validated_aorta_config.aorta_path), + container_mount_path=validated_aorta_config.container_mount_path, + base_config=validated_aorta_config.base_config, + training_overrides=validated_aorta_config.training_overrides, + docker=docker_config, + rccl=rccl_config, + environment=env_config, + build_script=validated_aorta_config.build_script, + experiment_script=validated_aorta_config.experiment_script, + gpus_per_node=validated_aorta_config.gpus_per_node, + skip_rccl_build=validated_aorta_config.skip_rccl_build, + timeout_seconds=validated_aorta_config.timeout_seconds, + ) + + +# ============================================================================= +# Tests +# ============================================================================= + +class TestAortaBenchmark: + """Test suite for Aorta benchmark.""" + + # Store results between tests + run_result = None + parse_result = None + benchmark_result = None + + def test_validate_runner_config(self, aorta_runner_config): + """ + Validate the runner configuration before executing. + + This is a secondary validation after the config file validation. + Checks runtime requirements like Docker availability. + """ + globals.error_list = [] + + runner = AortaRunner(aorta_runner_config) + errors = runner.validate_config() + + for error in errors: + fail_test(f"Runner config validation error: {error}") + + if not errors: + log.info("Runner configuration validated successfully") + + update_test_result() + + def test_run_benchmark(self, aorta_runner_config): + """Execute the Aorta benchmark.""" + globals.error_list = [] + + runner = AortaRunner(aorta_runner_config) + + # Execute full lifecycle + result = runner.execute() + + # Store for subsequent tests + TestAortaBenchmark.run_result = result + + # Check status + if result.status != RunStatus.COMPLETED: + fail_test(f"Benchmark run failed: {result.error_message}") + + log.info(f"Benchmark completed in {result.duration_seconds:.1f}s") + log.info(f"Artifacts: {list(result.artifacts.keys())}") + + update_test_result() + + def test_parse_results(self, aorta_runner_config, validated_aorta_config): + """Parse benchmark results into metrics.""" + globals.error_list = [] + + if TestAortaBenchmark.run_result is None: + pytest.skip("No run result available - run test_run_benchmark first") + + parser = TraceLensParser(use_tracelens=True) # Use TraceLens if available + + # Parse trace files + parse_result = parser.parse(TestAortaBenchmark.run_result) + TestAortaBenchmark.parse_result = parse_result + + if parse_result.status == ParseStatus.FAILED: + for error in parse_result.errors: + fail_test(f"Parse error: {error}") + + # Log warnings + for warning in parse_result.warnings: + log.warning(warning) + + log.info(f"Parsed {len(parse_result.results)} rank metrics") + + # Aggregate results + if parse_result.has_results: + benchmark_result = parser.aggregate( + parse_result, + num_nodes=len(aorta_runner_config.nodes), + gpus_per_node=aorta_runner_config.gpus_per_node, + nccl_channels=aorta_runner_config.environment.NCCL_MAX_NCHANNELS, + rccl_branch=aorta_runner_config.rccl.branch, + ) + TestAortaBenchmark.benchmark_result = benchmark_result + + if benchmark_result: + log.info("Aggregated results:") + log.info(f" Avg iteration time: {benchmark_result.avg_iteration_time_ms:.2f}ms") + log.info(f" Compute ratio: {benchmark_result.avg_compute_ratio:.2%}") + log.info(f" Comm ratio: {benchmark_result.avg_comm_ratio:.2%}") + log.info(f" Overlap ratio: {benchmark_result.avg_overlap_ratio:.2%}") + + update_test_result() + + def test_validate_thresholds(self, validated_aorta_config): + """Validate results against expected thresholds.""" + globals.error_list = [] + + if TestAortaBenchmark.benchmark_result is None: + pytest.skip("No benchmark result available - run test_parse_results first") + + parser = TraceLensParser() + + # Get expected results from validated config + expected = { + "max_avg_iteration_ms": validated_aorta_config.expected_results.max_avg_iteration_ms, + "min_compute_ratio": validated_aorta_config.expected_results.min_compute_ratio, + "min_overlap_ratio": validated_aorta_config.expected_results.min_overlap_ratio, + "max_time_variance_ratio": validated_aorta_config.expected_results.max_time_variance_ratio, + } + # Filter out None values + expected = {k: v for k, v in expected.items() if v is not None} + + failures = parser.validate_thresholds( + TestAortaBenchmark.benchmark_result, + expected + ) + + for failure in failures: + fail_test(failure) + + if not failures: + log.info("All threshold validations passed") + + update_test_result() + + def test_generate_report(self, aorta_runner_config): + """Generate benchmark report.""" + globals.error_list = [] + + if TestAortaBenchmark.benchmark_result is None: + pytest.skip("No benchmark result available") + + result = TestAortaBenchmark.benchmark_result + run_result = TestAortaBenchmark.run_result + + # Build report + report = { + "status": "completed", + "duration_seconds": run_result.duration_seconds if run_result else 0, + "cluster": { + "nodes": result.num_nodes, + "gpus_per_node": result.gpus_per_node, + "total_gpus": result.total_gpus, + }, + "configuration": { + "nccl_channels": result.nccl_channels, + "compute_channels": result.compute_channels, + "rccl_branch": result.rccl_branch, + }, + "performance": { + "avg_iteration_time_ms": result.avg_iteration_time_ms, + "std_iteration_time_us": result.std_iteration_time_us, + "avg_compute_ratio": result.avg_compute_ratio, + "avg_comm_ratio": result.avg_comm_ratio, + "avg_overlap_ratio": result.avg_overlap_ratio, + }, + "per_rank_summary": [ + { + "rank": m.rank, + "total_time_us": m.total_time_us, + "compute_ratio": m.compute_ratio, + } + for m in result.per_rank_metrics + ] + } + + # Save report + output_dir = aorta_runner_config.output_dir + output_dir.mkdir(parents=True, exist_ok=True) + report_path = output_dir / "aorta_benchmark_report.json" + + with open(report_path, 'w') as f: + json.dump(report, f, indent=2) + + log.info(f"Report saved to {report_path}") + + update_test_result() From f2c60c5d79f594fbe73ecc834abc67143aa64dca Mon Sep 17 00:00:00 2001 From: speriaswamy-amd Date: Fri, 5 Dec 2025 14:36:55 -0500 Subject: [PATCH 7/7] Requirements --- pyproject.toml | 23 +++++++++++++++++++++++ requirements.txt | 15 ++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 pyproject.toml diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..cdc1420 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,23 @@ +[project] +name = "cvs" +version = "0.1.0" +description = "Cluster validation suite" +readme = "README.md" +requires-python = ">=3.14" +dependencies = [ + "docker>=7.0.0", + "openpyxl>=3.1.5", + "orjson>=3.11.4", + "pandas>=2.3.3", + "parallel-ssh>=2.16.0.post1", + "paramiko>=4.0.0", + "pydantic>=2.0", + "pytest>=9.0.1", + "pytest-dependency>=0.6.0", + "pytest-html>=4.1.1", + "pytest-repeat>=0.9.4", + "pyyaml>=6.0", + "scp>=0.15.0", + "tracelens", + "xlsxwriter>=3.2.9", +] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3c73e3c..4fcf7a9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,17 @@ pytest-repeat pytest-dependency xlsxwriter pydantic >= 2.0 -pandas \ No newline at end of file +pandas + +# Docker SDK for container orchestration +docker >= 7.0.0 + +# TraceLens for PyTorch trace analysis (optional but recommended) +# Install manually: pip install git+https://github.com/AMD-AGI/TraceLens.git +# Required for accurate per-iteration metrics. Without it, basic parsing is used. +# tracelens # Uncomment after manual installation + +# YAML config parsing +pyyaml >= 6.0 +orjson +openpyxl \ No newline at end of file