Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Features
| **[GPU Telemetry](docs/tutorials/gpu-telemetry.md)** | Real-time GPU metrics collection via DCGM (power, utilization, memory, temperature, etc) | Performance optimization, resource monitoring, multi-node telemetry |
| **[Template Endpoint](docs/tutorials/template-endpoint.md)** | Benchmark custom APIs with flexible Jinja2 request templates | Custom API formats, rapid prototyping, non-standard endpoints |
| **[SGLang Image Generation](docs/tutorials/sglang-image-generation.md)** | Benchmark image generation APIs using SGLang with FLUX.1-dev model | Image generation testing, text-to-image benchmarking, extracting generated images |
| **[Server Metrics](docs/tutorials/server-metrics.md)** | Collect Prometheus-compatible server metrics during benchmarking | Performance optimization, resource monitoring, multi-node telemetry |

### Working with Benchmark Data
- **[Profile Exports](docs/tutorials/working-with-profile-exports.md)** - Parse and analyze `profile_export.jsonl` with Pydantic models, custom metrics, and async processing
Expand Down
6 changes: 6 additions & 0 deletions docs/cli_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ The delay in seconds before cancelling requests. This is used when --request-can

Enable GPU telemetry console display and optionally specify: (1) 'dashboard' for realtime dashboard mode, (2) custom DCGM exporter URLs (e.g., http://node1:9401/metrics), (3) custom metrics CSV file (e.g., custom_gpu_metrics.csv). Default endpoints localhost:9400 and localhost:9401 are always attempted. Example: --gpu-telemetry dashboard node1:9400 custom.csv.

## Server Metrics Options

#### `--server-metrics` `<list>`

Server metrics collection (ENABLED BY DEFAULT). Automatically collects from inference endpoint base_url + `/metrics`. Optionally specify additional custom Prometheus-compatible endpoint URLs (e.g., http://node1:8081/metrics, http://node2:9090/metrics). Use AIPERF_SERVER_METRICS_ENABLED=false to disable. Example: `--server-metrics node1:8081 node2:9090/metrics` for additional endpoints.

## ZMQ Communication Options

#### `--zmq-host` `<str>`
Expand Down
632 changes: 632 additions & 0 deletions docs/tutorials/server-metrics.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/aiperf/common/config/config_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ class OutputDefaults:
PROFILE_EXPORT_JSONL_FILE = Path("profile_export.jsonl")
PROFILE_EXPORT_RAW_JSONL_FILE = Path("profile_export_raw.jsonl")
PROFILE_EXPORT_GPU_TELEMETRY_JSONL_FILE = Path("gpu_telemetry_export.jsonl")
SERVER_METRICS_EXPORT_JSONL_FILE = Path("server_metrics_export.jsonl")
SERVER_METRICS_METADATA_JSON_FILE = Path("server_metrics_metadata.json")
EXPORT_LEVEL = ExportLevel.RECORDS
SLICE_DURATION = None

Expand Down
1 change: 1 addition & 0 deletions src/aiperf/common/config/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Groups:
IMAGE_INPUT = Group.create_ordered("Image Input")
VIDEO_INPUT = Group.create_ordered("Video Input")
SERVICE = Group.create_ordered("Service")
SERVER_METRICS = Group.create_ordered("Server Metrics")
TELEMETRY = Group.create_ordered("Telemetry")
UI = Group.create_ordered("UI")
WORKERS = Group.create_ordered("Workers")
Expand Down
27 changes: 25 additions & 2 deletions src/aiperf/common/config/output_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ class OutputConfig(BaseConfig):
_profile_export_gpu_telemetry_jsonl_file: Path = (
OutputDefaults.PROFILE_EXPORT_GPU_TELEMETRY_JSONL_FILE
)
_server_metrics_export_jsonl_file: Path = (
OutputDefaults.SERVER_METRICS_EXPORT_JSONL_FILE
)
_server_metrics_metadata_json_file: Path = (
OutputDefaults.SERVER_METRICS_METADATA_JSON_FILE
)

@model_validator(mode="after")
def set_export_filenames(self) -> Self:
Expand All @@ -85,10 +91,14 @@ def set_export_filenames(self) -> Self:
base_path = self.profile_export_prefix
base_str = str(base_path)

# Check complex suffixes first (longest to shortest) to avoid double-suffixing
# e.g., if user passes "foo_raw.jsonl", we want "foo" not "foo_raw"
suffixes_to_strip = [
"_server_metrics_metadata.json",
"_server_metrics.jsonl",
"_gpu_telemetry.jsonl",
"_timeslices.csv",
"_timeslices.json",
"_gpu_telemetry.jsonl",
"_raw.jsonl",
".csv",
".json",
Expand All @@ -108,7 +118,12 @@ def set_export_filenames(self) -> Self:
self._profile_export_gpu_telemetry_jsonl_file = Path(
f"{base_str}_gpu_telemetry.jsonl"
)

self._server_metrics_export_jsonl_file = Path(
f"{base_str}_server_metrics.jsonl"
)
self._server_metrics_metadata_json_file = Path(
f"{base_str}_server_metrics_metadata.json"
)
return self

slice_duration: Annotated[
Expand Down Expand Up @@ -149,3 +164,11 @@ def profile_export_raw_jsonl_file(self) -> Path:
@property
def profile_export_gpu_telemetry_jsonl_file(self) -> Path:
return self.artifact_directory / self._profile_export_gpu_telemetry_jsonl_file

@property
def server_metrics_export_jsonl_file(self) -> Path:
return self.artifact_directory / self._server_metrics_export_jsonl_file

@property
def server_metrics_metadata_json_file(self) -> Path:
return self.artifact_directory / self._server_metrics_metadata_json_file
52 changes: 50 additions & 2 deletions src/aiperf/common/config/user_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ def _count_dataset_entries(self) -> int:
gpu_telemetry: Annotated[
list[str] | None,
Field(
default=None,
description=(
"Enable GPU telemetry console display and optionally specify: "
"(1) 'dashboard' for realtime dashboard mode, "
Expand All @@ -229,7 +228,7 @@ def _count_dataset_entries(self) -> int:
consume_multiple=True,
group=Groups.TELEMETRY,
),
]
] = None

_gpu_telemetry_mode: GPUTelemetryMode = GPUTelemetryMode.SUMMARY
_gpu_telemetry_urls: list[str] = []
Expand Down Expand Up @@ -286,6 +285,55 @@ def gpu_telemetry_metrics_file(self) -> Path | None:
"""Get the path to custom GPU metrics CSV file."""
return self._gpu_telemetry_metrics_file

server_metrics: Annotated[
list[str] | None,
Field(
description=(
"Server metrics collection (ENABLED BY DEFAULT). "
"Automatically collects from inference endpoint base_url + `/metrics`. "
"Optionally specify additional custom Prometheus-compatible endpoint URLs "
"(e.g., http://node1:8081/metrics, http://node2:9090/metrics). "
"Use AIPERF_SERVER_METRICS_ENABLED=false to disable. "
"Example: `--server-metrics node1:8081 node2:9090/metrics` for additional endpoints"
),
),
BeforeValidator(parse_str_or_list),
CLIParameter(
name=("--server-metrics",),
consume_multiple=True,
group=Groups.SERVER_METRICS,
),
] = None

_server_metrics_urls: list[str] = []

@model_validator(mode="after")
def _parse_server_metrics_config(self) -> Self:
"""Parse server_metrics list into URLs.

Check Environment.SERVER_METRICS.ENABLED to see if collection is enabled.
Empty list [] means enabled with automatic discovery only.
Non-empty list means enabled with custom URLs.
"""
from aiperf.common.metric_utils import normalize_metrics_endpoint_url

urls: list[str] = []

for item in self.server_metrics or []:
# Check for URLs (anything with : or starting with http)
if item.startswith("http") or ":" in item:
normalized_url = item if item.startswith("http") else f"http://{item}"
normalized_url = normalize_metrics_endpoint_url(normalized_url)
urls.append(normalized_url)

self._server_metrics_urls = urls
return self

@property
def server_metrics_urls(self) -> list[str]:
"""Get the parsed server metrics Prometheus endpoint URLs."""
return self._server_metrics_urls

@model_validator(mode="after")
def _compute_config(self) -> Self:
"""Compute additional configuration.
Expand Down
136 changes: 136 additions & 0 deletions src/aiperf/common/duplicate_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Duplicate tracker for deduplicating records."""

import asyncio
from collections import defaultdict
from collections.abc import Callable
from typing import Any, Generic, TypeVar

from aiperf.common.mixins import AIPerfLoggerMixin

TRecord = TypeVar("TRecord", bound=Any)


class AsyncKeyedDuplicateTracker(AIPerfLoggerMixin, Generic[TRecord]):
"""Tracker for deduplicating records by key and value.

Args:
key_function: A function that takes a record and returns a key for tracking duplicates. This is used to group records by key.
value_function: A function that takes a record and returns a value for comparison. This is used to compare the current value to the previous value.

Notes:
The key_function and value_function are used to group records by key and compare values. This is useful for cases where
the record itself contains timestamps or other metadata that is not relevant to the value being compared for deduplication.

Tracks the previous record for each key and detects duplicates.

Deduplication logic:
Consecutive identical values are suppressed to save
storage while preserving complete timeline information. The strategy:

1. First occurrence → always written (marks start of period)
2. Duplicates → skipped and counted
3. Change detected → last duplicate written, then new record
(provides end timestamp of previous period + start of new period)

Example: Input A,A,A,B,B,C,D,D,D,D → Output A,A,B,B,C,D,D

Why write the last occurrence? Time-series data needs actual observations:
Without: A@t1, B@t4 ← You could guess A ended at ~t3, but no proof
With: A@t1, A@t3, B@t4 ← A was observed until t3

Without the last occurrence, you'd rely on interpolation/assumptions rather
than actual measured data. This enables accurate duration calculations,
timeline visualization (Grafana), and time-weighted averages. Essential
for metrics requiring precise change detection.

Deduplication uses equality (==) on the metrics dictionary for each separate endpoint.
"""

def __init__(
self,
key_function: Callable[[TRecord], str],
value_function: Callable[[TRecord], Any] = lambda x: x,
**kwargs,
) -> None:
super().__init__(**kwargs)
# Lock for safe access to creating dynamic locks for deduplication.
self._lock_creation_lock = asyncio.Lock()
self._dupe_locks: dict[str, asyncio.Lock] = {}
self._dupe_counts: dict[str, int] = defaultdict(int)
# Keep track of the previous record for each endpoint to detect duplicates.
self._previous_records: dict[str, TRecord] = {}
self._key_function = key_function
self._value_function = value_function

async def deduplicate_record(self, record: TRecord) -> list[TRecord]:
"""Deduplicate a record and return the records to write.

Args:
record: The record to deduplicate.

Returns:
A list of records to write containing either an empty list, the current record, or the current and previous records.
"""
records_to_write: list[TRecord] = [record]

key = self._key_function(record)
value = self._value_function(record)

if key not in self._dupe_locks:
# Create a lock for this key if it doesn't exist
async with self._lock_creation_lock:
# Double check inside the lock to avoid race conditions
if key not in self._dupe_locks:
self.trace(lambda: f"Creating lock for key: {key}")
self._dupe_locks[key] = asyncio.Lock()

# Check for duplicates and update the records to write
async with self._dupe_locks[key]:
if key in self._previous_records:
if self._value_function(self._previous_records[key]) == value:
self._dupe_counts[key] += 1
self.trace(
lambda: f"Duplicate found for key: {key}, incrementing dupe count to {self._dupe_counts[key]}"
)
# Clear the list instead of return so the previous record is still updated down below
records_to_write.clear()

# If we have duplicates, we need to write the previous record before the current record,
# in order to know when the change actually occurs.
elif self._dupe_counts[key] > 0:
self._dupe_counts[key] = 0
self.trace(
lambda: f"New change detected for key: {key}, writing previous record and resetting dupe count"
)
records_to_write.insert(0, self._previous_records[key])

self._previous_records[key] = record

return records_to_write

async def flush_remaining_duplicates(self) -> list[TRecord]:
"""Flush remaining duplicates for all keys on shutdown.

When the system is stopping, there may be pending duplicates that haven't
been written yet (because we're still in a duplicate sequence). This method
returns the last occurrence for each key that has pending duplicates.

Returns:
A list of records that need to be flushed.
"""
records_to_flush: list[TRecord] = []

# Iterate through all keys that have pending duplicates
for key in list(self._dupe_counts.keys()):
if self._dupe_counts[key] > 0 and key in self._dupe_locks:
async with self._dupe_locks[key]:
if self._dupe_counts[key] > 0 and key in self._previous_records:
self.trace(
lambda key=key: f"Flushing {self._dupe_counts[key]} remaining duplicates for key: {key}"
)
records_to_flush.append(self._previous_records[key])
self._dupe_counts[key] = 0

return records_to_flush
4 changes: 4 additions & 0 deletions src/aiperf/common/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
RecordProcessorType,
ResultsProcessorType,
)
from aiperf.common.enums.prometheus_enums import (
PrometheusMetricType,
)
from aiperf.common.enums.service_enums import (
LifecycleState,
ServiceRegistrationStatus,
Expand Down Expand Up @@ -155,6 +158,7 @@
"ModelSelectionStrategy",
"PowerMetricUnit",
"PowerMetricUnitInfo",
"PrometheusMetricType",
"PromptSource",
"PublicDatasetType",
"RecordProcessorType",
Expand Down
2 changes: 2 additions & 0 deletions src/aiperf/common/enums/message_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,7 @@ class MessageType(CaseInsensitiveStrEnum):
STATUS = "status"
TELEMETRY_RECORDS = "telemetry_records"
TELEMETRY_STATUS = "telemetry_status"
SERVER_METRICS_RECORDS = "server_metrics_records"
SERVER_METRICS_STATUS = "server_metrics_status"
WORKER_HEALTH = "worker_health"
WORKER_STATUS_SUMMARY = "worker_status_summary"
3 changes: 3 additions & 0 deletions src/aiperf/common/enums/post_processor_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class ResultsProcessorType(CaseInsensitiveStrEnum):
"""Processor that exports per-record metrics to JSONL files with display unit conversion and filtering.
Only enabled when export_level is set to RECORDS."""

SERVER_METRICS_JSONL_WRITER = "server_metrics_jsonl_writer"
"""Processor that exports server metrics data to JSONL files."""

TELEMETRY_EXPORT = "telemetry_export"
"""Processor that exports per-record GPU telemetry data to JSONL files.
Writes each TelemetryRecord as it arrives from the TelemetryManager."""
Expand Down
47 changes: 47 additions & 0 deletions src/aiperf/common/enums/prometheus_enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from typing import Any

from typing_extensions import Self

from aiperf.common.enums.base_enums import CaseInsensitiveStrEnum


class PrometheusMetricType(CaseInsensitiveStrEnum):
"""Prometheus metric types as defined in the Prometheus exposition format.

See: https://prometheus.io/docs/concepts/metric_types/
"""

COUNTER = "counter"
"""Counter: A cumulative metric that represents a single monotonically increasing counter."""

GAUGE = "gauge"
"""Gauge: A metric that represents a single numerical value that can arbitrarily go up and down."""

HISTOGRAM = "histogram"
"""Histogram: Samples observations and counts them in configurable buckets."""

SUMMARY = "summary"
"""Summary: Similar to histogram, samples observations and provides quantiles."""

UNKNOWN = "unknown"
"""Unknown: Untyped metric (prometheus_client uses 'unknown' instead of 'untyped')."""

@classmethod
def _missing_(cls, value: Any) -> Self:
"""
Handles cases where a value is not directly found in the enumeration.

This method is called when an attempt is made to access an enumeration
member using a value that does not directly match any of the defined
members. It provides custom logic to handle such cases.

Returns:
The matching enumeration member if a case-insensitive match is found
for string values; otherwise, returns PrometheusMetricType.UNKNOWN.
"""
try:
return super()._missing_(value)
except ValueError:
return cls.UNKNOWN
4 changes: 1 addition & 3 deletions src/aiperf/common/enums/service_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ class ServiceType(CaseInsensitiveStrEnum):
WORKER_MANAGER = "worker_manager"
WORKER = "worker"
TELEMETRY_MANAGER = "telemetry_manager"

# For testing purposes only
TEST = "test_service"
SERVER_METRICS_MANAGER = "server_metrics_manager"


class ServiceRegistrationStatus(CaseInsensitiveStrEnum):
Expand Down
Loading