Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion install/requirements_py3.11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ pymilvus
clickhouse_connect
pyvespa
mysql-connector-python
packaging
packaging
hdrhistogram>=0.10.1
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dependencies = [
"scikit-learn",
"pymilvus", # with pandas, numpy, ujson
"ujson",
"hdrhistogram>=0.10.1",
]
dynamic = ["version"]

Expand Down
68 changes: 50 additions & 18 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from multiprocessing.queues import Queue

import numpy as np
from hdrh.histogram import HdrHistogram

from vectordb_bench.backend.filter import Filter, non_filter

Expand All @@ -18,6 +19,12 @@
NUM_PER_BATCH = config.NUM_PER_BATCH
log = logging.getLogger(__name__)

# HDR Histogram constants
HDR_HISTOGRAM_MIN_US = 1
HDR_HISTOGRAM_MAX_US = 60_000_000 # 60 seconds
HDR_HISTOGRAM_SIGNIFICANT_DIGITS = 3 # ±0.1% accuracy
US_TO_SECONDS = 1_000_000


class MultiProcessingSearchRunner:
"""multiprocessing search runner
Expand Down Expand Up @@ -237,23 +244,34 @@ def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, li

qps = round(all_success_count / cost, 4)

# Collect and calculate latencies
all_latencies = []
for r in res:
if len(r) > 2 and r[2]: # Has latency data
all_latencies.extend(r[2])
# Aggregate latency stats from worker processes
latency_stats_list = [
r[2] for r in res
if r[2] and r[2].get('count', 0) > 0
]

# Calculate percentiles
if all_latencies:
latency_p99 = np.percentile(all_latencies, 99)
latency_p95 = np.percentile(all_latencies, 95)
latency_avg = np.mean(all_latencies)
if latency_stats_list:
total_query_count = sum(stats['count'] for stats in latency_stats_list)

if total_query_count > 0:
# Use max for conservative percentile estimate
latency_p99 = max(stats['p99'] for stats in latency_stats_list)
latency_p95 = max(stats['p95'] for stats in latency_stats_list)

# Weighted average
latency_avg = sum(
stats['avg'] * stats['count']
for stats in latency_stats_list
) / total_query_count
else:
latency_p99 = 0
latency_p95 = 0
latency_avg = 0
else:
latency_p99 = 0
latency_p95 = 0
latency_avg = 0

# Store in lists
conc_num_list.append(conc)
conc_qps_list.append(qps)
conc_latency_p99_list.append(latency_p99)
Expand Down Expand Up @@ -291,12 +309,12 @@ def _run_by_dur(self, duration: int) -> tuple[float, float, list, list, list, li
conc_latency_avg_list,
)

def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int, list]:
def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) -> tuple[int, int, dict]:
"""
Returns:
int: successful requests count
int: failed requests count
list: latencies of successful requests
dict: latency statistics with p99, p95, avg, count (computed via HDR Histogram)
"""
# sync all process
q.put(1)
Expand All @@ -307,16 +325,23 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
self.db.prepare_filter(self.filters)
num, idx = len(test_data), random.randint(0, len(test_data) - 1)

# Memory-efficient latency tracking
histogram = HdrHistogram(
HDR_HISTOGRAM_MIN_US,
HDR_HISTOGRAM_MAX_US,
HDR_HISTOGRAM_SIGNIFICANT_DIGITS
)

start_time = time.perf_counter()
success_count = 0
failed_cnt = 0
latencies = []
while time.perf_counter() < start_time + dur:
s = time.perf_counter()
try:
self.db.search_embedding(test_data[idx], self.k)
success_count += 1
latencies.append(time.perf_counter() - s)
latency_us = int((time.perf_counter() - s) * US_TO_SECONDS)
histogram.record_value(min(latency_us, HDR_HISTOGRAM_MAX_US))
except Exception as e:
failed_cnt += 1
# reduce log
Expand All @@ -330,8 +355,7 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con

if success_count % 500 == 0:
log.debug(
f"({mp.current_process().name:16}) search_count: {success_count}, "
f"latest_latency={time.perf_counter()-s}",
f"({mp.current_process().name:16}) search_count: {success_count}",
)

total_dur = round(time.perf_counter() - start_time, 4)
Expand All @@ -341,4 +365,12 @@ def search_by_dur(self, dur: int, test_data: list[list[float]], q: mp.Queue, con
f"qps (successful) in this process: {round(success_count / total_dur, 4):3}",
)

return success_count, failed_cnt, latencies
# Pre-computed stats to avoid large data transfer
latency_stats = {
'p99': histogram.get_value_at_percentile(99) / US_TO_SECONDS,
'p95': histogram.get_value_at_percentile(95) / US_TO_SECONDS,
'avg': histogram.get_mean_value() / US_TO_SECONDS,
'count': histogram.get_total_count(),
}

return success_count, failed_cnt, latency_stats
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ def drawConcurrentPerformanceSection(container, case_data, case_name: str):
return

container.markdown("---")
container.subheader("🔍 Concurrent Performance Detail")
container.subheader("Concurrent Search Performance")
container.markdown(
"Detailed Latency → QPS relationship at each stage. "
"Shows how search performance changes with different load levels."
"Displays how latency and QPS vary with concurrency."
)

# View mode selector
Expand Down Expand Up @@ -60,7 +60,7 @@ def drawSingleStageView(container, case_data, case_name: str):
# Check if this stage has concurrent data
if not case_data["st_conc_qps_list_list"][stage_idx]:
container.warning(
f"⚠️ No concurrent performance data for {stage}% stage.\n\n"
f"No concurrent search data for {stage}% stage.\n\n"
f"**Reason:** Concurrent tests were skipped because there wasn't enough time "
f"between stages (< 10s per concurrency level).\n\n"
f"**Tip:** Use a larger dataset or slower insert rate to get data for all stages."
Expand Down Expand Up @@ -244,7 +244,7 @@ def drawComparisonChart(container, case_data, selected_stages, metric_name, case
# Check if any data was plotted
if stages_plotted == 0:
container.warning(
f"⚠️ None of the selected stages have concurrent performance data.\n\n"
f"None of the selected stages have concurrent search data.\n\n"
f"**Skipped stages:** {', '.join([f'{s}%' for s in stages_skipped])}\n\n"
f"**Reason:** Concurrent tests were skipped because there wasn't enough time "
f"between stages (< 10s per concurrency level).\n\n"
Expand All @@ -255,7 +255,7 @@ def drawComparisonChart(container, case_data, selected_stages, metric_name, case
# Show warning for skipped stages
if stages_skipped:
container.info(
f"ℹ️ Skipped stages with no concurrent data: {', '.join([f'{s}%' for s in stages_skipped])}"
f"Stages without data: {', '.join([f'{s}%' for s in stages_skipped])}"
)

fig.update_layout(
Expand All @@ -276,8 +276,6 @@ def drawComparisonChart(container, case_data, selected_stages, metric_name, case

# Add insight
container.info(
"💡 **Insight**: As data grows (from early to late stages), "
"you can observe how both QPS capacity and latency change. "
"This helps predict production performance as your database scales."
"**Insight:** Compare curves across stages to understand how performance scales with data growth."
)