From 6725b70cae39d40f64c01d3a1564e9633be872c7 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Wed, 12 Mar 2025 21:48:14 +0000 Subject: [PATCH 01/28] first commit --- benchmark_serving.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 9aa2577..cbc5835 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -476,25 +476,27 @@ async def benchmark( if res is None: continue latency, ttft, itl, errors = res - prompt_len, output_len, request_latency = latency - overall_results["latencies"].append(latency) - if ttft: - overall_results["ttfts"].append(ttft) - overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - if itl: - overall_results["itls"].extend(itl) if errors: for k, v in errors.items(): overall_results["errors"][k] += v - per_model_results[chosen_model]["latencies"].append(latency) - if ttft: - per_model_results[chosen_model]["ttfts"].append(ttft) - per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - if itl: - per_model_results[chosen_model]["itls"].extend(itl) - if errors: - for k, v in errors.items(): - per_model_results[chosen_model]["errors"][k] += v + else: + prompt_len, output_len, request_latency = latency + overall_results["latencies"].append(latency) + if ttft: + overall_results["ttfts"].append(ttft) + overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) + if itl: + overall_results["itls"].extend(itl) + + per_model_results[chosen_model]["latencies"].append(latency) + if ttft: + per_model_results[chosen_model]["ttfts"].append(ttft) + per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) + if itl: + per_model_results[chosen_model]["itls"].extend(itl) + if errors: + for k, v in errors.items(): + per_model_results[chosen_model]["errors"][k] += v benchmark_duration = time.time() - benchmark_start_time From d976ec859a922ca27e627c3034fd3580be09a19a Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Wed, 12 Mar 2025 21:51:58 +0000 Subject: [PATCH 02/28] refactor --- benchmark_serving.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index cbc5835..24efa56 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -477,26 +477,21 @@ async def benchmark( continue latency, ttft, itl, errors = res if errors: - for k, v in errors.items(): - overall_results["errors"][k] += v + for k, v in errors.items(): + overall_results["errors"][k] += v + per_model_results[chosen_model]["errors"][k] += v else: prompt_len, output_len, request_latency = latency overall_results["latencies"].append(latency) + per_model_results[chosen_model]["latencies"].append(latency) if ttft: overall_results["ttfts"].append(ttft) overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - if itl: - overall_results["itls"].extend(itl) - - per_model_results[chosen_model]["latencies"].append(latency) - if ttft: per_model_results[chosen_model]["ttfts"].append(ttft) per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) if itl: - per_model_results[chosen_model]["itls"].extend(itl) - if errors: - for k, v in errors.items(): - per_model_results[chosen_model]["errors"][k] += v + overall_results["itls"].extend(itl) + per_model_results[chosen_model]["itls"].extend(itl) benchmark_duration = time.time() - benchmark_start_time From f583747da8028dfa2e035ed4563299e83b56bef8 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Wed, 26 Mar 2025 23:04:55 +0000 Subject: [PATCH 03/28] first commit --- benchmark_serving.py | 191 ++++++++++++++++++++++--------------------- 1 file changed, 96 insertions(+), 95 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 24efa56..62b8c50 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -25,6 +25,7 @@ from datetime import datetime import json import random +import sys import requests import time from typing import AsyncGenerator, List, Optional, Tuple, Dict @@ -153,6 +154,7 @@ def init_errors_map() -> Dict[str, int]: async def send_stream_request( backend: str, + clientSession: any, api_url: str, prompt: str, prompt_len: int, @@ -198,51 +200,50 @@ async def send_stream_request( most_recent_timestamp = st output = "" timeout = aiohttp.ClientTimeout(total=timeout) - async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session: - try: - async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: - async for chunk_bytes in response.content.iter_chunks(): - chunk_bytes = chunk_bytes[0].strip() - if not chunk_bytes: - continue - timestamp = time.perf_counter() - # First token - if ttft == 0.0: - ttft = timestamp - st - else: - itl.append(timestamp - most_recent_timestamp) - most_recent_timestamp = timestamp - if backend == "vllm": - if chunk_bytes.decode("utf-8")[6:] != "[DONE]": - output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] - elif backend == "jetstream": - if chunk_bytes.decode("utf-8") != "": - output += json.loads(chunk_bytes.decode("utf-8"))["text"] - - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 - return None, None, None, errors + try: + async with clientSession.post(api_url, headers=headers, json=pload, ssl=False) as response: + async for chunk_bytes in response.content.iter_chunks(): + chunk_bytes = chunk_bytes[0].strip() + if not chunk_bytes: + continue + timestamp = time.perf_counter() + # First token + if ttft == 0.0: + ttft = timestamp - st + else: + itl.append(timestamp - most_recent_timestamp) + most_recent_timestamp = timestamp + if backend == "vllm": + if chunk_bytes.decode("utf-8")[6:] != "[DONE]": + output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"] + elif backend == "jetstream": + if chunk_bytes.decode("utf-8") != "": + output += json.loads(chunk_bytes.decode("utf-8"))["text"] + + except aiohttp.client_exceptions.ClientConnectorError as client_err: + errors["ClientConnectorError"] += 1 + print(f"ClientConnectorError: {client_err}") + return None, None, None, errors + except asyncio.TimeoutError as timeout_err: + errors["TimeoutError"] += 1 + print(f"TimeoutError: {timeout_err}") + return None, None, None, errors + except aiohttp.client_exceptions.ClientOSError as e: + errors["ClientOSError"] += 1 + print(f"ClientOSError: {e}") + return None, None, None, errors + except aiohttp.client_exceptions.ContentTypeError as e: + print(f"ContentTypeError: {e}, response: {response}") + errors["ContentTypeError"] += 1 + return None, None, None, errors + except aiohttp.client_exceptions.ServerDisconnectedError as e: + errors["ServerDisconnectedError"] += 1 + print(f"ServerDisconnectedError: {e}") + return None, None, None, errors + except Exception as e: + print(f"Unknown error {e}") + errors["unknown_error"] += 1 + return None, None, None, errors request_end_time = time.time() output_token_ids = tokenizer(output).input_ids output_len = len(output_token_ids) @@ -259,6 +260,7 @@ async def send_stream_request( return request_latency, ttft, itl, None async def send_request( + clientSession: any, backend: str, api_url: str, prompt: str, @@ -287,7 +289,7 @@ async def send_request( "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "max_tokens": output_len, - "ignore_eos": False, + "ignore_eos": True, "stream": False, } elif backend == "tgi": @@ -343,41 +345,39 @@ async def send_request( else: raise ValueError(f"Unknown backend: {backend}") - # Set client timeout to be 3 hrs. - timeout = aiohttp.ClientTimeout(total=timeout) - async with aiohttp.ClientSession(timeout=timeout,trust_env=True,trace_configs=[trace_config]) as session: - while True: - try: - async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: - output = await response.json() - - # Re-send the request if it failed. - if "error" not in output: - break - except aiohttp.client_exceptions.ClientConnectorError as client_err: - errors["ClientConnectorError"] += 1 - print(f"ClientConnectorError: {client_err}") - return None, None, None, errors - except asyncio.TimeoutError as timeout_err: - errors["TimeoutError"] += 1 - print(f"TimeoutError: {timeout_err}") - return None, None, None, errors - except aiohttp.client_exceptions.ClientOSError as e: - errors["ClientOSError"] += 1 - print(f"ClientOSError: {e}") - return None, None, None, errors - except aiohttp.client_exceptions.ContentTypeError as e: - print(f"ContentTypeError: {e}, response: {response}") - errors["ContentTypeError"] += 1 - return None, None, None, errors - except aiohttp.client_exceptions.ServerDisconnectedError as e: - errors["ServerDisconnectedError"] += 1 - print(f"ServerDisconnectedError: {e}") - return None, None, None, errors - except Exception as e: - print(f"Unknown error {e}") - errors["unknown_error"] += 1 - return None, None, None, errors + while True: + try: + async with clientSession.post(api_url, headers=headers, json=pload, ssl=False, timeout=None) as response: + output = await response.json() + async with clientSession.head("http://vllm-inference-server:8000/metrics", headers={}, ssl=False) as _: + pass + # Re-send the request if it failed. + if "error" not in output: + break + except aiohttp.client_exceptions.ClientConnectorError as client_err: + errors["ClientConnectorError"] += 1 + print(f"ClientConnectorError: {client_err}") + return None, None, None, errors + except asyncio.TimeoutError as timeout_err: + errors["TimeoutError"] += 1 + print(f"TimeoutError: {timeout_err}") + return None, None, None, errors + except aiohttp.client_exceptions.ClientOSError as e: + errors["ClientOSError"] += 1 + print(f"ClientOSError: {e}") + return None, None, None, errors + except aiohttp.client_exceptions.ContentTypeError as e: + print(f"ContentTypeError: {e}, response: {response}") + errors["ContentTypeError"] += 1 + return None, None, None, errors + except aiohttp.client_exceptions.ServerDisconnectedError as e: + errors["ServerDisconnectedError"] += 1 + print(f"ServerDisconnectedError: {e}") + return None, None, None, errors + except Exception as e: + print(f"Unknown error {e}") + errors["unknown_error"] += 1 + return None, None, None, errors request_end_time = time.time() # Naive HF transformers generation and TensorRT-LLM generation stops at EOS @@ -414,15 +414,15 @@ async def send_request( return request_latency, None, None, None -async def run_single_request(args: argparse.Namespace, api_url: str, tokenizer: PreTrainedTokenizerBase, +async def run_single_request(args: argparse.Namespace, clientSession: any, api_url: str, tokenizer: PreTrainedTokenizerBase, prompt: str, prompt_len: int, output_len: int, chosen_model: str) -> Tuple[str, Tuple]: if args.stream_request: result = await send_stream_request( - args.backend, api_url, prompt, prompt_len, output_len, + clientSession, args.backend, api_url, prompt, prompt_len, output_len, args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,) else: result = await send_request( - args.backend, api_url, prompt, prompt_len, output_len, + clientSession, args.backend, api_url, prompt, prompt_len, output_len, args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,) return chosen_model, result @@ -456,16 +456,17 @@ async def benchmark( benchmark_start_time = time.time() tasks: List[asyncio.Task] = [] prompts_sent = 0 - async for request in generate_next_request(input_requests, args.request_rate): - if prompts_sent >= args.num_prompts: - break - prompt, prompt_len, output_len = request - chosen_model = random.choices(model_names, weights=model_weights)[0] - task = asyncio.create_task(run_single_request(args, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model)) - tasks.append(task) - prompts_sent += 1 - - results = await asyncio.gather(*tasks) + async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=28000,),timeout=None,) as clientSession: + async for request in generate_next_request(input_requests, args.request_rate): + if prompts_sent >= args.num_prompts: + break + prompt, prompt_len, output_len = request + chosen_model = random.choices(model_names, weights=model_weights)[0] + task = asyncio.create_task(run_single_request(args, clientSession, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model)) + tasks.append(task) + prompts_sent += 1 + print("send all requests") + results = await asyncio.gather(*tasks) overall_results = {"latencies": [], "ttfts": [], "itls": [], "tpots": [], "errors": init_errors_map()} per_model_results: Dict[str, Dict[str, List]] = {} @@ -1054,7 +1055,7 @@ def parse_traffic_split(arg): parser.add_argument("--pm-job", type=str, default="vllm-podmonitoring", help="name of the pod monitoring object, ignored if scrape-server-metrics is false") cmd_args = parser.parse_args() - level = logging.INFO + level = logging.DEBUG logger = logging.getLogger(__name__) logger.setLevel(level) handler = logging.StreamHandler() # This sends output to the console From 945b76ad8bcfeb0735a132ac4541f434e7e01ff5 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Wed, 26 Mar 2025 23:09:35 +0000 Subject: [PATCH 04/28] revert --- benchmark_serving.py | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 62b8c50..1371fcc 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -477,22 +477,25 @@ async def benchmark( if res is None: continue latency, ttft, itl, errors = res + prompt_len, output_len, request_latency = latency + overall_results["latencies"].append(latency) + if ttft: + overall_results["ttfts"].append(ttft) + overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) + if itl: + overall_results["itls"].extend(itl) if errors: - for k, v in errors.items(): - overall_results["errors"][k] += v - per_model_results[chosen_model]["errors"][k] += v - else: - prompt_len, output_len, request_latency = latency - overall_results["latencies"].append(latency) - per_model_results[chosen_model]["latencies"].append(latency) - if ttft: - overall_results["ttfts"].append(ttft) - overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - per_model_results[chosen_model]["ttfts"].append(ttft) - per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - if itl: - overall_results["itls"].extend(itl) - per_model_results[chosen_model]["itls"].extend(itl) + for k, v in errors.items(): + overall_results["errors"][k] += v + per_model_results[chosen_model]["latencies"].append(latency) + if ttft: + per_model_results[chosen_model]["ttfts"].append(ttft) + per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) + if itl: + per_model_results[chosen_model]["itls"].extend(itl) + if errors: + for k, v in errors.items(): + per_model_results[chosen_model]["errors"][k] += v benchmark_duration = time.time() - benchmark_start_time From c3b40ca040d18948ec3794b4a5ed3e794fcb60cd Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Wed, 26 Mar 2025 23:10:10 +0000 Subject: [PATCH 05/28] revert --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 1371fcc..476b3ca 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -495,7 +495,7 @@ async def benchmark( per_model_results[chosen_model]["itls"].extend(itl) if errors: for k, v in errors.items(): - per_model_results[chosen_model]["errors"][k] += v + per_model_results[chosen_model]["errors"][k] += v benchmark_duration = time.time() - benchmark_start_time From 3989da02cb6b06d23d53bbf73c5d6edcd9fa173c Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 17:12:46 +0000 Subject: [PATCH 06/28] improper error handling --- benchmark_serving.py | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 476b3ca..27feed8 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -349,8 +349,8 @@ async def send_request( try: async with clientSession.post(api_url, headers=headers, json=pload, ssl=False, timeout=None) as response: output = await response.json() - async with clientSession.head("http://vllm-inference-server:8000/metrics", headers={}, ssl=False) as _: - pass + # async with clientSession.head("http://vllm-inference-server:8000/metrics", headers={}, ssl=False) as _: + # pass # Re-send the request if it failed. if "error" not in output: break @@ -477,25 +477,23 @@ async def benchmark( if res is None: continue latency, ttft, itl, errors = res + prompt_len, output_len, request_latency = latency - overall_results["latencies"].append(latency) - if ttft: - overall_results["ttfts"].append(ttft) - overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - if itl: - overall_results["itls"].extend(itl) - if errors: - for k, v in errors.items(): - overall_results["errors"][k] += v - per_model_results[chosen_model]["latencies"].append(latency) - if ttft: - per_model_results[chosen_model]["ttfts"].append(ttft) - per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) - if itl: - per_model_results[chosen_model]["itls"].extend(itl) if errors: - for k, v in errors.items(): - per_model_results[chosen_model]["errors"][k] += v + for k, v in errors.items(): + overall_results["errors"][k] += v + per_model_results[chosen_model]["errors"][k] += v + elif latency: + overall_results["latencies"].append(latency) + per_model_results[chosen_model]["latencies"].append(latency) + if ttft: + overall_results["ttfts"].append(ttft) + overall_results["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) + per_model_results[chosen_model]["ttfts"].append(ttft) + per_model_results[chosen_model]["tpots"].append((request_latency - ttft) / (output_len - 1) if output_len > 1 else 0) + if itl: + overall_results["itls"].extend(itl) + per_model_results[chosen_model]["itls"].extend(itl) benchmark_duration = time.time() - benchmark_start_time From 5321440f947248556aa1fdb4a0c2c6aa152a6641 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 17:13:39 +0000 Subject: [PATCH 07/28] add comment --- benchmark_serving.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/benchmark_serving.py b/benchmark_serving.py index 27feed8..8b14462 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -479,6 +479,8 @@ async def benchmark( latency, ttft, itl, errors = res prompt_len, output_len, request_latency = latency + + # `latency` and `errors` are mutually exclusive if errors: for k, v in errors.items(): overall_results["errors"][k] += v From ed6e3051ea740e57fcd7ffc12cbc1fb2ad4039db Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 17:29:49 +0000 Subject: [PATCH 08/28] remove dummy call --- benchmark_serving.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 8b14462..db5a9a3 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -349,8 +349,6 @@ async def send_request( try: async with clientSession.post(api_url, headers=headers, json=pload, ssl=False, timeout=None) as response: output = await response.json() - # async with clientSession.head("http://vllm-inference-server:8000/metrics", headers={}, ssl=False) as _: - # pass # Re-send the request if it failed. if "error" not in output: break From 20008f1f33be9ed3a6fca7e769842b406abdf3c9 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 17:33:32 +0000 Subject: [PATCH 09/28] deduplicate flags --- latency_throughput_curve.sh | 74 +++++++++++++++---------------------- 1 file changed, 30 insertions(+), 44 deletions(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index c3398fc..0eb160d 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -1,18 +1,5 @@ #!/bin/bash -# Copyright 2024 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. set -o xtrace export IP=$IP @@ -24,48 +11,47 @@ if [[ "$PROMPT_DATASET" = "sharegpt" ]]; then fi PYTHON="python3" -PYTHON_OPTS="benchmark_serving.py " +BASE_PYTHON_OPTS=( + "benchmark_serving.py" + "--save-json-results" + "--host=$IP" + "--port=$PORT" + "--dataset=$PROMPT_DATASET_FILE" + "--tokenizer=$TOKENIZER" + "--backend=$BACKEND" + "--max-input-length=$INPUT_LENGTH" + "--max-output-length=$OUTPUT_LENGTH" + "--file-prefix=$FILE_PREFIX" + "--models=$MODELS" + "--pm-namespace=$PM_NAMESPACE" + "--pm-job=$PM_JOB" +) + +[[ "$TRAFFIC_SPLIT" ]] && BASE_PYTHON_OPTS+=("--traffic-split=$TRAFFIC_SPLIT") +[[ "$OUTPUT_BUCKET" ]] && BASE_PYTHON_OPTS+=("--output-bucket=$OUTPUT_BUCKET") +[[ "$SCRAPE_SERVER_METRICS" = "true" ]] && BASE_PYTHON_OPTS+=("--scrape-server-metrics") +[[ "$SAVE_AGGREGATED_RESULT" = "true" ]] && BASE_PYTHON_OPTS+=("--save-aggregated-result") +[[ "$STREAM_REQUEST" = "true" ]] && BASE_PYTHON_OPTS+=("--stream-request") +[[ "$OUTPUT_BUCKET_FILEPATH" ]] && BASE_PYTHON_OPTS+=("--output-bucket-filepath" "$OUTPUT_BUCKET_FILEPATH") + for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do echo "Benchmarking request rate: ${request_rate}" - # TODO: Check if profile already exists, if so then skip timestamp=$(date +"%Y-%m-%d_%H-%M-%S") output_file="latency-profile-${timestamp}.txt" - if [ ${request_rate} == 0 ]; then + + if [ "$request_rate" == "0" ]; then request_rate="inf" num_prompts=$MAX_NUM_PROMPTS else num_prompts=$(awk "BEGIN {print int($request_rate * $BENCHMARK_TIME_SECONDS)}") fi - - echo "TOTAL prompts: $num_prompts" - - # Build the python command options - PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$num_prompts --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH --file-prefix=$FILE_PREFIX --models=$MODELS --pm-namespace=$PM_NAMESPACE --pm-job=$PM_JOB" - - if [[ "$TRAFFIC_SPLIT" ]]; then - PYTHON_OPTS="$PYTHON_OPTS --traffic-split=$TRAFFIC_SPLIT" - fi - if [[ "$OUTPUT_BUCKET" ]]; then - PYTHON_OPTS="$PYTHON_OPTS --output-bucket=$OUTPUT_BUCKET" - fi - - if [[ "$SCRAPE_SERVER_METRICS" = "true" ]]; then - PYTHON_OPTS="$PYTHON_OPTS --scrape-server-metrics" - fi - if [[ "$SAVE_AGGREGATED_RESULT" = "true" ]]; then - PYTHON_OPTS="$PYTHON_OPTS --save-aggregated-result" - fi - if [[ "$STREAM_REQUEST" = "true" ]]; then - PYTHON_OPTS="$PYTHON_OPTS --stream-request" - fi - if [[ "$OUTPUT_BUCKET_FILEPATH" ]]; then - PYTHON_OPTS="$PYTHON_OPTS --output-bucket-filepath $OUTPUT_BUCKET_FILEPATH" - fi + echo "TOTAL prompts: $num_prompts" + PYTHON_OPTS=("${BASE_PYTHON_OPTS[@]}" "--request-rate=$request_rate" "--num-prompts=$num_prompts") - $PYTHON $PYTHON_OPTS > $output_file - cat $output_file - sleep 30 # wait 30 seconds before next run to ensure metrics isolation + $PYTHON "${PYTHON_OPTS[@]}" > "$output_file" + cat "$output_file" + sleep 30 done export LPG_FINISHED="true" From 1f65d6322b3455c15e1fc7a2fae657194413bdad Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 17:52:54 +0000 Subject: [PATCH 10/28] configurable sleep time --- latency_throughput_curve.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index 0eb160d..5eaeeb8 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -34,6 +34,8 @@ BASE_PYTHON_OPTS=( [[ "$STREAM_REQUEST" = "true" ]] && BASE_PYTHON_OPTS+=("--stream-request") [[ "$OUTPUT_BUCKET_FILEPATH" ]] && BASE_PYTHON_OPTS+=("--output-bucket-filepath" "$OUTPUT_BUCKET_FILEPATH") +SLEEP_TIME=${SLEEP_TIME:-0} + for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do echo "Benchmarking request rate: ${request_rate}" timestamp=$(date +"%Y-%m-%d_%H-%M-%S") @@ -51,7 +53,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do $PYTHON "${PYTHON_OPTS[@]}" > "$output_file" cat "$output_file" - sleep 30 + sleep $SLEEP_TIME done export LPG_FINISHED="true" From 614de11c74aa71521d1072b0ae6d45b520fe81e2 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 20:43:54 +0000 Subject: [PATCH 11/28] extra log --- latency_throughput_curve.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index 5eaeeb8..71b088e 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -53,6 +53,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do $PYTHON "${PYTHON_OPTS[@]}" > "$output_file" cat "$output_file" + echo "Sleeping for $SLEEP_TIME seconds..." sleep $SLEEP_TIME done From 6a5196eec30f4911d72334795616cbf82fa67696 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:41:17 +0000 Subject: [PATCH 12/28] revert latency_throughput_curve.sh --- latency_throughput_curve.sh | 81 ++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index 71b088e..6e9d172 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -1,47 +1,34 @@ #!/bin/bash +# Copyright 2024 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. set -o xtrace export IP=$IP - huggingface-cli login --token "$HF_TOKEN" --add-to-git-credential - if [[ "$PROMPT_DATASET" = "sharegpt" ]]; then PROMPT_DATASET_FILE="ShareGPT_V3_unfiltered_cleaned_split.json" fi PYTHON="python3" -BASE_PYTHON_OPTS=( - "benchmark_serving.py" - "--save-json-results" - "--host=$IP" - "--port=$PORT" - "--dataset=$PROMPT_DATASET_FILE" - "--tokenizer=$TOKENIZER" - "--backend=$BACKEND" - "--max-input-length=$INPUT_LENGTH" - "--max-output-length=$OUTPUT_LENGTH" - "--file-prefix=$FILE_PREFIX" - "--models=$MODELS" - "--pm-namespace=$PM_NAMESPACE" - "--pm-job=$PM_JOB" -) - -[[ "$TRAFFIC_SPLIT" ]] && BASE_PYTHON_OPTS+=("--traffic-split=$TRAFFIC_SPLIT") -[[ "$OUTPUT_BUCKET" ]] && BASE_PYTHON_OPTS+=("--output-bucket=$OUTPUT_BUCKET") -[[ "$SCRAPE_SERVER_METRICS" = "true" ]] && BASE_PYTHON_OPTS+=("--scrape-server-metrics") -[[ "$SAVE_AGGREGATED_RESULT" = "true" ]] && BASE_PYTHON_OPTS+=("--save-aggregated-result") -[[ "$STREAM_REQUEST" = "true" ]] && BASE_PYTHON_OPTS+=("--stream-request") -[[ "$OUTPUT_BUCKET_FILEPATH" ]] && BASE_PYTHON_OPTS+=("--output-bucket-filepath" "$OUTPUT_BUCKET_FILEPATH") - -SLEEP_TIME=${SLEEP_TIME:-0} - +PYTHON_OPTS="benchmark_serving.py " for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do echo "Benchmarking request rate: ${request_rate}" + # TODO: Check if profile already exists, if so then skip timestamp=$(date +"%Y-%m-%d_%H-%M-%S") output_file="latency-profile-${timestamp}.txt" - - if [ "$request_rate" == "0" ]; then + if [ ${request_rate} == 0 ]; then request_rate="inf" num_prompts=$MAX_NUM_PROMPTS else @@ -49,13 +36,35 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do fi echo "TOTAL prompts: $num_prompts" - PYTHON_OPTS=("${BASE_PYTHON_OPTS[@]}" "--request-rate=$request_rate" "--num-prompts=$num_prompts") - - $PYTHON "${PYTHON_OPTS[@]}" > "$output_file" - cat "$output_file" - echo "Sleeping for $SLEEP_TIME seconds..." - sleep $SLEEP_TIME + + # Build the python command options + PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$num_prompts --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH --file-prefix=$FILE_PREFIX --models=$MODELS --pm-namespace=$PM_NAMESPACE --pm-job=$PM_JOB" + + if [[ "$TRAFFIC_SPLIT" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --traffic-split=$TRAFFIC_SPLIT" + fi + + if [[ "$OUTPUT_BUCKET" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --output-bucket=$OUTPUT_BUCKET" + fi + + if [[ "$SCRAPE_SERVER_METRICS" = "true" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --scrape-server-metrics" + fi + if [[ "$SAVE_AGGREGATED_RESULT" = "true" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --save-aggregated-result" + fi + if [[ "$STREAM_REQUEST" = "true" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --stream-request" + fi + if [[ "$OUTPUT_BUCKET_FILEPATH" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --output-bucket-filepath $OUTPUT_BUCKET_FILEPATH" + fi + + $PYTHON $PYTHON_OPTS > $output_file + cat $output_file + sleep 30 # wait 30 seconds before next run to ensure metrics isolation done export LPG_FINISHED="true" -sleep infinity +sleep infinity \ No newline at end of file From 099f7300b9dbfd706869c0be281986e53aaf36d8 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:42:26 +0000 Subject: [PATCH 13/28] more reversions --- latency_throughput_curve.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index 6e9d172..b155c2e 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -16,7 +16,9 @@ set -o xtrace export IP=$IP + huggingface-cli login --token "$HF_TOKEN" --add-to-git-credential + if [[ "$PROMPT_DATASET" = "sharegpt" ]]; then PROMPT_DATASET_FILE="ShareGPT_V3_unfiltered_cleaned_split.json" fi @@ -67,4 +69,4 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do done export LPG_FINISHED="true" -sleep infinity \ No newline at end of file +sleep infinity From a3196686f618357dd88a237227037a4ed64d7a8e Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:43:02 +0000 Subject: [PATCH 14/28] more reversions --- latency_throughput_curve.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index b155c2e..6b0a6c0 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -38,7 +38,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do fi echo "TOTAL prompts: $num_prompts" - + # Build the python command options PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$num_prompts --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH --file-prefix=$FILE_PREFIX --models=$MODELS --pm-namespace=$PM_NAMESPACE --pm-job=$PM_JOB" From 945cd1b01d1d6b874ebaeafc7685189ad76c285d Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:43:27 +0000 Subject: [PATCH 15/28] more reversions --- latency_throughput_curve.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index 6b0a6c0..425e751 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -38,7 +38,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do fi echo "TOTAL prompts: $num_prompts" - + # Build the python command options PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$num_prompts --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH --file-prefix=$FILE_PREFIX --models=$MODELS --pm-namespace=$PM_NAMESPACE --pm-job=$PM_JOB" @@ -62,7 +62,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do if [[ "$OUTPUT_BUCKET_FILEPATH" ]]; then PYTHON_OPTS="$PYTHON_OPTS --output-bucket-filepath $OUTPUT_BUCKET_FILEPATH" fi - + $PYTHON $PYTHON_OPTS > $output_file cat $output_file sleep 30 # wait 30 seconds before next run to ensure metrics isolation From 87794d22b96c9af9d516d7811df32f9f45eddc9b Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:43:49 +0000 Subject: [PATCH 16/28] more reversions --- latency_throughput_curve.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index 425e751..eec45f2 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -36,9 +36,9 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do else num_prompts=$(awk "BEGIN {print int($request_rate * $BENCHMARK_TIME_SECONDS)}") fi - + echo "TOTAL prompts: $num_prompts" - + # Build the python command options PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$num_prompts --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH --file-prefix=$FILE_PREFIX --models=$MODELS --pm-namespace=$PM_NAMESPACE --pm-job=$PM_JOB" @@ -62,7 +62,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do if [[ "$OUTPUT_BUCKET_FILEPATH" ]]; then PYTHON_OPTS="$PYTHON_OPTS --output-bucket-filepath $OUTPUT_BUCKET_FILEPATH" fi - + $PYTHON $PYTHON_OPTS > $output_file cat $output_file sleep 30 # wait 30 seconds before next run to ensure metrics isolation From 39fdba705c2a8bd4c7a54279ed7ad0e8d1a41129 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:44:05 +0000 Subject: [PATCH 17/28] more reversions --- latency_throughput_curve.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/latency_throughput_curve.sh b/latency_throughput_curve.sh index eec45f2..c3398fc 100755 --- a/latency_throughput_curve.sh +++ b/latency_throughput_curve.sh @@ -62,7 +62,7 @@ for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do if [[ "$OUTPUT_BUCKET_FILEPATH" ]]; then PYTHON_OPTS="$PYTHON_OPTS --output-bucket-filepath $OUTPUT_BUCKET_FILEPATH" fi - + $PYTHON $PYTHON_OPTS > $output_file cat $output_file sleep 30 # wait 30 seconds before next run to ensure metrics isolation From b1dbefb0c14e30d585878dad79a9b83a92c60728 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:44:33 +0000 Subject: [PATCH 18/28] log level debug -> info --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index db5a9a3..ec7ab8a 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -1056,7 +1056,7 @@ def parse_traffic_split(arg): parser.add_argument("--pm-job", type=str, default="vllm-podmonitoring", help="name of the pod monitoring object, ignored if scrape-server-metrics is false") cmd_args = parser.parse_args() - level = logging.DEBUG + level = logging.INFO logger = logging.getLogger(__name__) logger.setLevel(level) handler = logging.StreamHandler() # This sends output to the console From 817f172c5f60dea9658045ad3da014e41d11a078 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 21:57:23 +0000 Subject: [PATCH 19/28] remove log line --- benchmark_serving.py | 1 - 1 file changed, 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 4e41483..dac5fbe 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -463,7 +463,6 @@ async def benchmark( task = asyncio.create_task(run_single_request(args, clientSession, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model)) tasks.append(task) prompts_sent += 1 - print("send all requests") results = await asyncio.gather(*tasks) overall_results = {"latencies": [], "ttfts": [], "itls": [], "tpots": [], "errors": init_errors_map()} From 67932bbce8828f505cc4f4b0822ebce98dca9006 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 22:33:18 +0000 Subject: [PATCH 20/28] readd traceconfig --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index dac5fbe..cb6b251 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -454,7 +454,7 @@ async def benchmark( benchmark_start_time = time.time() tasks: List[asyncio.Task] = [] prompts_sent = 0 - async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=28000,),timeout=None,) as clientSession: + async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=28000,),timeout=None, trace_configs=[trace_config]) as clientSession: async for request in generate_next_request(input_requests, args.request_rate): if prompts_sent >= args.num_prompts: break From 218b2d4b7899f06444b5fe9b347b0f1d278142e7 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 27 Mar 2025 22:42:50 +0000 Subject: [PATCH 21/28] added active_connections_metric --- benchmark_serving.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/benchmark_serving.py b/benchmark_serving.py index cb6b251..740d630 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -54,13 +54,16 @@ tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request (excluding first token)') ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request') active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') +active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections') # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): active_requests_metric.inc() + active_connections_metric.set(len(session.connector._acquired)) async def on_request_end(session, trace_config_ctx, params): active_requests_metric.dec() + active_connections_metric.set(len(session.connector._acquired)) trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) From b1beefa406d14fa8bcf4c9e7aa4a32588b17b7b7 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:05:03 +0000 Subject: [PATCH 22/28] revert ignore_eos --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 740d630..78a59f9 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -292,7 +292,7 @@ async def send_request( "temperature": 0.0 if use_beam_search else 1.0, "top_p": 1.0, "max_tokens": output_len, - "ignore_eos": True, + "ignore_eos": False, "stream": False, } elif backend == "tgi": From 08c2954002ffa7912cafc2c11ce121029f37b0ad Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:44:41 +0000 Subject: [PATCH 23/28] added log --- benchmark_serving.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 78a59f9..6222141 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -46,6 +46,7 @@ MIN_SEQ_LEN = 4 NEW_TEXT_KEY = "\nOutput:\n" PROMETHEUS_PORT = 9090 +CONNECTIONS_LIMIT = 28000 # Prometheus Metrics prompt_length_metric = Histogram("LatencyProfileGenerator:prompt_length", "Input prompt length", buckets=[2**i for i in range(1, 16)]) @@ -56,10 +57,18 @@ active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections') +# Should only print an `exhausted connections` warning once per run to not pollute the logs +logged_exhausted_ports = False + # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): + global logged_exhausted_ports # Explicitly reference the global variable active_requests_metric.inc() active_connections_metric.set(len(session.connector._acquired)) + if not logged_exhausted_ports and len(session.connector._acquired) == CONNECTIONS_LIMIT: + print("Warning: Connection limit reached. Some Prometheus metrics may be missing or inaccurate due to exhausted ports.") + logged_exhausted_ports = True + async def on_request_end(session, trace_config_ctx, params): active_requests_metric.dec() @@ -457,7 +466,7 @@ async def benchmark( benchmark_start_time = time.time() tasks: List[asyncio.Task] = [] prompts_sent = 0 - async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=28000,),timeout=None, trace_configs=[trace_config]) as clientSession: + async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=CONNECTIONS_LIMIT,),timeout=None, trace_configs=[trace_config]) as clientSession: async for request in generate_next_request(input_requests, args.request_rate): if prompts_sent >= args.num_prompts: break From bcf6cc70b9f36c9a39437b28f51077e6bd707851 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:47:50 +0000 Subject: [PATCH 24/28] fix log --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 6222141..2efcbdd 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -66,7 +66,7 @@ async def on_request_start(session, trace_config_ctx, params): active_requests_metric.inc() active_connections_metric.set(len(session.connector._acquired)) if not logged_exhausted_ports and len(session.connector._acquired) == CONNECTIONS_LIMIT: - print("Warning: Connection limit reached. Some Prometheus metrics may be missing or inaccurate due to exhausted ports.") + print("Warning: Connection limit reached. Server metrics may be missing or inaccurate") logged_exhausted_ports = True From be3a2d9239e3253e31448404425543b7c5a7af91 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:50:47 +0000 Subject: [PATCH 25/28] fix comment --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 2efcbdd..e5c59df 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -57,7 +57,7 @@ active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed') active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections') -# Should only print an `exhausted connections` warning once per run to not pollute the logs +# Exhaused connections warning should only be printed once per run logged_exhausted_ports = False # Add trace config for monitoring in flight requests From 927d36695438df9bd3c2b750bbae417d9d607423 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:51:12 +0000 Subject: [PATCH 26/28] remove comment --- benchmark_serving.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index e5c59df..8ff102d 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -62,7 +62,7 @@ # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): - global logged_exhausted_ports # Explicitly reference the global variable + global logged_exhausted_ports active_requests_metric.inc() active_connections_metric.set(len(session.connector._acquired)) if not logged_exhausted_ports and len(session.connector._acquired) == CONNECTIONS_LIMIT: From 53fa0e5cbb6f85bdec1a6a361e496157a9d647b1 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:55:41 +0000 Subject: [PATCH 27/28] omit server metrics if connection limit reached --- benchmark_serving.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index 8ff102d..e3dec58 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -66,7 +66,7 @@ async def on_request_start(session, trace_config_ctx, params): active_requests_metric.inc() active_connections_metric.set(len(session.connector._acquired)) if not logged_exhausted_ports and len(session.connector._acquired) == CONNECTIONS_LIMIT: - print("Warning: Connection limit reached. Server metrics may be missing or inaccurate") + print("Warning: Connection limit reached. Server metrics will not be omitted due to innacruacy") logged_exhausted_ports = True @@ -840,7 +840,7 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re } server_metrics = {} - if args.scrape_server_metrics: + if args.scrape_server_metrics and : server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.pm_namespace, args.pm_job) if args.save_json_results: save_json_results(args, benchmark_result, server_metrics, model, errors) From fdf1024e38fcd0ac3bd3ef5a77b90128b8806656 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Fri, 28 Mar 2025 17:56:32 +0000 Subject: [PATCH 28/28] omit server metrics if connection limit reached --- benchmark_serving.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/benchmark_serving.py b/benchmark_serving.py index e3dec58..44f0849 100644 --- a/benchmark_serving.py +++ b/benchmark_serving.py @@ -58,16 +58,16 @@ active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections') # Exhaused connections warning should only be printed once per run -logged_exhausted_ports = False +connection_limit_reached = False # Add trace config for monitoring in flight requests async def on_request_start(session, trace_config_ctx, params): - global logged_exhausted_ports + global connection_limit_reached active_requests_metric.inc() active_connections_metric.set(len(session.connector._acquired)) - if not logged_exhausted_ports and len(session.connector._acquired) == CONNECTIONS_LIMIT: - print("Warning: Connection limit reached. Server metrics will not be omitted due to innacruacy") - logged_exhausted_ports = True + if not connection_limit_reached and len(session.connector._acquired) == CONNECTIONS_LIMIT: + print("Warning: Connection limit reached. Omitting server metrics due to inaccuracy") + connection_limit_reached = True async def on_request_end(session, trace_config_ctx, params): @@ -840,7 +840,8 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re } server_metrics = {} - if args.scrape_server_metrics and : + global connection_limit_reached + if args.scrape_server_metrics and not connection_limit_reached: server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.pm_namespace, args.pm_job) if args.save_json_results: save_json_results(args, benchmark_result, server_metrics, model, errors)