-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnodebench.py
More file actions
261 lines (234 loc) · 10.6 KB
/
nodebench.py
File metadata and controls
261 lines (234 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# nodebench.py – Phase 1: Local LAN Benchmarking
import time
import requests
import concurrent.futures
import socket
from tqdm import tqdm
import numpy as np
import argparse
import uuid
def make_endpoints_from_root(root_url):
root = root_url.rstrip("/")
return {
"embedding": f"{root}/v1/embeddings",
"completions": f"{root}/v1/chat/completions"
}
def time_embedding(endpoint, text):
payload = {
"model": "default",
"input": text
}
start = time.perf_counter()
try:
response = requests.post(endpoint, json=payload, timeout=120)
duration = time.perf_counter() - start
status = response.status_code
resp_json = response.json()
embedding = resp_json["data"][0]["embedding"] if "data" in resp_json and resp_json["data"] else None
tokens = resp_json.get("usage", {}).get("prompt_tokens", 0)
if duration == 0 or embedding is None:
print(f"[DEBUG] time_embedding: status={status}, duration={duration}, embedding=None, response={resp_json}")
except Exception as e:
print(f"[ERROR] Exception in time_embedding: {e}")
duration = 0
embedding = None
tokens = 0
return {"duration": duration, "tokens": tokens, "embedding": embedding}
def generate_prompts(completions_url, n=10):
prompts = []
for i in range(n):
prompt = (
f"Request ID: {uuid.uuid4()}\n"
f"Generate a unique, diverse story, between 2-6 paragraphs long. "
f"Each paragraph should be at least 1024 tokens long (aim for 750+ words). "
f"Output only the story, no explanations, no bullet points, no numbers, no headings or titles."
)
payload = {"model": "default", "messages": [{"role": "user", "content": prompt}], "max_tokens": 3000, "n": 1}
try:
response = requests.post(completions_url, json=payload, timeout=120)
text = response.json()["choices"][0]["message"]["content"]
story = text.strip()
prompts.append(story)
except Exception as e:
print(f"[ERROR] Could not generate prompt {i+1}: {e}")
continue
return prompts
def device_label(endpoint):
try:
return socket.gethostbyaddr(endpoint.split("//")[-1].split(":")[0])[0]
except:
return endpoint
# Embedding fingerprint stats for repeated prompt
# Returns dict with mean/std of L2 norm, mean, std, and unique value count
def embedding_fingerprint_stats(embeddings):
arr = np.array(embeddings)
l2_norms = np.linalg.norm(arr, axis=1)
means = np.mean(arr, axis=1)
stds = np.std(arr, axis=1)
unique_counts = [len(np.unique(np.round(vec, 6))) for vec in arr if vec is not None] # round to avoid float noise
return {
"l2_norm_mean": float(np.mean(l2_norms)),
"l2_norm_std": float(np.std(l2_norms)),
"mean_mean": float(np.mean(means)),
"std_mean": float(np.mean(stds)),
"unique_count_mean": float(np.mean(unique_counts)),
"unique_count_min": int(np.min(unique_counts)),
"unique_count_max": int(np.max(unique_counts)),
}
def cosine_similarity_stats(vectors):
"""Compute min, max, mean cosine similarity between all pairs in vectors."""
if not vectors or len(vectors) < 2:
return None
arr = np.array(vectors)
normed = arr / np.linalg.norm(arr, axis=1, keepdims=True)
sims = np.dot(normed, normed.T)
idx = np.triu_indices_from(sims, k=1)
values = sims[idx]
return {
"min": float(np.min(values)),
"max": float(np.max(values)),
"mean": float(np.mean(values)),
"std": float(np.std(values)),
"num_pairs": int(len(values)),
}
def run_tests(endpoint, repeats, concurrency, prompts):
results = []
repeatability_passed = True
print("\nStarting Throughput and Latency Test...")
with tqdm(total=repeats * len(prompts), desc="Throughput/Latency", ncols=100) as pbar:
repeat_vectors = []
for _ in range(repeats):
for i, text in enumerate(prompts):
res = time_embedding(endpoint, text)
results.append(res)
if i == 0 and res["embedding"] is not None:
repeat_vectors.append(res["embedding"])
pbar.update(1)
print("Throughput and Latency Test Complete.\n")
print("Starting Concurrency Test...")
with tqdm(total=repeats * concurrency, desc="Concurrency", ncols=100) as pbar:
for _ in range(repeats):
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as ex:
futures = [ex.submit(time_embedding, endpoint, prompts[0]) for _ in range(concurrency)]
for f in futures:
results.append(f.result())
pbar.update(1)
print("Concurrency Test Complete.\n")
# Repeatability: check all vectors for prompt 0 are identical (within float tolerance)
cosine_stats = cosine_similarity_stats(repeat_vectors)
if cosine_stats and cosine_stats["min"] < 1-1e-5:
repeatability_passed = False
return results, repeatability_passed, cosine_stats, repeat_vectors
def score(results, concurrency, repeatability_passed):
durations = [r["duration"] for r in results if r["duration"] > 0]
tokens = [r["tokens"] for r in results if r["tokens"] > 0]
throughput_scores = [tok/dur for tok, dur in zip(tokens, durations) if dur > 0]
avg_throughput = sum(throughput_scores) / len(throughput_scores) if throughput_scores else 0.0
avg_latency = sum(durations) / len(durations) if durations else 0.0
throughput_score = int(round(avg_throughput))
latency_score = int(round(1000 / avg_latency)) if avg_latency > 0 else 0
# Concurrency: median/p90 duration for burst group (last N results)
burst_group = results[-concurrency:]
burst_times = [r["duration"] for r in burst_group]
if len(burst_times) >= 2:
burst_times.sort()
p90 = burst_times[int(len(burst_times) * 0.9)]
median = burst_times[len(burst_times) // 2]
if p90 == 0:
print("[WARNING] p90 concurrency burst time is zero. Concurrency score set to 0. Check endpoint and results.")
concurrency_score = 0
else:
concurrency_score = int(round(min(median / p90, 1.0) * 100))
else:
concurrency_score = 100
repeatability_score = 100 if repeatability_passed else 0
total = throughput_score + concurrency_score + latency_score + repeatability_score
return {
"throughput": throughput_score,
"concurrency": concurrency_score,
"latency": latency_score,
"repeatability": repeatability_score,
"total": total,
"avg_tok_per_sec": avg_throughput,
"avg_latency": avg_latency
}
def device_label(endpoint):
try:
return socket.gethostbyaddr(endpoint.split("//")[-1].split(":")[0])[0]
except:
return endpoint
def completions_endpoint_from_embeddings(embedding_url):
# Replace only the last occurrence of '/embeddings'
if embedding_url.endswith('/embeddings'):
return embedding_url.rsplit('/embeddings', 1)[0] + '/chat/completions'
raise ValueError("Embedding endpoint does not end with '/embeddings'")
def device_label(endpoint):
try:
return socket.gethostbyaddr(endpoint.split("//")[-1].split(":")[0])[0]
except:
return endpoint
def run_benchmark(endpoint, prompts, repeats, concurrency):
# Import your existing run_tests, score, embedding_fingerprint_stats here
results, repeatability_passed, cosine_stats, repeat_vectors = run_tests(endpoint, repeats, concurrency, prompts)
scores = score(results, concurrency, repeatability_passed)
fingerprint = embedding_fingerprint_stats(repeat_vectors) if repeat_vectors else None
return scores, repeatability_passed, cosine_stats, fingerprint
def compare_scores(scores_list, labels):
print("\n===== NodeBench Multi-Node Results =====\n")
metrics = [k for k in scores_list[0].keys() if k != 'total'] + ['total']
baseline = scores_list[0]
baseline_label = labels[0]
for idx, (label, scores) in enumerate(zip(labels, scores_list)):
print(f"[{label}]")
for metric in metrics:
val = scores[metric]
if idx == 0:
if metric == "avg_tok_per_sec":
print(f" {metric.capitalize():<14}: {val:.3f}")
else:
print(f" {metric.capitalize():<14}: {val}")
else:
base_val = baseline[metric]
if base_val == 0:
pct = "n/a"
else:
pct_val = 100 * (val - base_val) / base_val if metric != 'latency' else 100 * (base_val - val) / base_val
pct = f"{pct_val:+.1f}%"
print(f" {metric.capitalize():<14}: {val} (Δ vs {baseline_label}: {pct})")
print()
def main():
parser = argparse.ArgumentParser(description="NodeBench - Multi-Node Embedding Benchmark")
parser.add_argument('--urls', required=True, help='Comma-separated list of root URLs (e.g. https://foo.domains,https://bar.domains)')
parser.add_argument('--repeats', type=int, default=3, help='Number of repeats per prompt (default: 3)')
parser.add_argument('--concurrency', type=int, default=5, help='Concurrency (default: 5)')
parser.add_argument('--n_prompts', type=int, default=10, help='Number of prompts to generate (default: 5)')
args = parser.parse_args()
root_urls = [e.strip() for e in args.urls.split(",") if e.strip()]
if not root_urls:
print("No root URLs provided.")
return
repeats = args.repeats
concurrency = args.concurrency
n_prompts = args.n_prompts
# Generate prompts ONCE from the first endpoint
endpoints = make_endpoints_from_root(root_urls[0])
prompts = generate_prompts(endpoints["completions"], n=n_prompts)
if not prompts:
print("[FATAL] Could not generate prompts. Exiting.")
return
print("\nPrompts used for all endpoints:")
for i, p in enumerate(prompts):
print(f"{i+1}. {p}")
node_labels = []
node_scores = []
for i, root_url in enumerate(root_urls):
endpoints = make_endpoints_from_root(root_url)
label = f"Node {chr(ord('A') + i)}"
print(f"\n[{label}] Running benchmark...")
scores, repeatability_passed, cosine_stats, fingerprint = run_benchmark(endpoints["embedding"], prompts, repeats, concurrency)
node_labels.append(label)
node_scores.append(scores)
print(f"[{label}] Score: {scores['total']}, Repeatability: {repeatability_passed}")
compare_scores(node_scores, node_labels)
if __name__ == "__main__":
main()