Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions vsb/cmdline_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,16 @@ def add_vsb_cmdline_args(
default=100,
help="Number of requests to generate for the synthetic workload. For synthetic proportional "
"workloads, this is the number of requests (including upserts) to run after the initial "
"population. Default is %(default)s.",
"population. Ignored when --synthetic_duration is specified. Default is %(default)s.",
)
synthetic_group.add_argument(
"--synthetic_duration",
type=float,
default=None,
help="Duration in seconds for the Run phase of synthetic workloads. "
"When specified, requests are generated continuously until the duration expires. "
"Mutually exclusive with --synthetic_requests. Not supported with synthetic-runbook. "
"Default is 300 (5 minutes) when used.",
)
synthetic_group.add_argument(
"--synthetic_dimensions",
Expand Down Expand Up @@ -521,9 +530,22 @@ def validate_parsed_args(
pass
match args.workload:
case "synthetic" | "synthetic-proportional" | "synthetic-runbook":
# When --synthetic_duration is specified, it takes precedence over
# --synthetic_requests (which has a default of 100).
if args.synthetic_duration is not None:
args.synthetic_requests = None
# --synthetic_duration is not supported with synthetic-runbook
if (
args.synthetic_duration is not None
and args.workload == "synthetic-runbook"
):
parser.error(
"--synthetic_duration is not supported with the synthetic-runbook workload. "
"Please use --synthetic_requests instead."
)

required = (
"synthetic_records",
"synthetic_requests",
"synthetic_dimensions",
"synthetic_metric",
"synthetic_top_k",
Expand Down
65 changes: 47 additions & 18 deletions vsb/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ def __init__(self, environment):
f"Initialising RunUser id:{self.user_id}, target request/sec:{target_throughput}"
)
self.query_iter = None
self._duration = self.workload.synthetic_duration()
self._deadline = None

@task
def request(self):
Expand All @@ -273,6 +275,19 @@ def do_run(self):
self.query_iter = self.workload.get_query_iter(
self.users_total, self.user_id, batch_size
)
if self._duration is not None:
self._deadline = time.time() + self._duration

# Duration mode: check if time has expired
if self._deadline is not None and time.time() >= self._deadline:
logger.debug(
f"User id:{self.user_id} completed Run phase (duration expired)"
)
self.environment.runner.send_message(
"update_progress", {"user": self.user_id, "phase": "run"}
)
self.state = RunUser.State.Done
return

tenant: str = None
request: QueryRequest = None
Expand Down Expand Up @@ -793,23 +808,37 @@ def get_recall_pct(p):
+ f"[magenta]Recall: {recall_str}"
)

# If --synthetic-no-aggregate-stats is set, the cumulative request
# count is stored in the stats[workload.name] object. We just use
# the total request count for the entire runbook as the total,
# although it breaks convention with non-runbook workloads.
if (
isinstance(
env.workload_sequence,
vsb.workloads.synthetic_workload.synthetic_workload.SyntheticRunbook,
# Determine progress bar mode: time-based or count-based
duration = workload.synthetic_duration()
if duration is not None:
# Duration mode: show elapsed time vs total duration
elapsed = time.time() - search_stats.start_time
vsb.progress.update(
self.progress_task_id,
completed=int(min(elapsed, duration)),
total=int(duration),
extra_info=metrics_str,
)
and not self.no_aggregate_stats
):
total = env.workload_sequence.request_count()
else:
total = self.request_count
vsb.progress.update(
self.progress_task_id,
completed=cumulative_num_requests,
total=total,
extra_info=metrics_str,
)
# Count mode: show completed requests vs total
# If --synthetic-no-aggregate-stats is set, the cumulative
# request count is stored in the stats[workload.name]
# object. We just use the total request count for the
# entire runbook as the total, although it breaks
# convention with non-runbook workloads.
if (
isinstance(
env.workload_sequence,
vsb.workloads.synthetic_workload.synthetic_workload.SyntheticRunbook,
)
and not self.no_aggregate_stats
):
total = env.workload_sequence.request_count()
else:
total = self.request_count
vsb.progress.update(
self.progress_task_id,
completed=cumulative_num_requests,
total=total,
extra_info=metrics_str,
)
7 changes: 7 additions & 0 deletions vsb/workloads/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ def recall_available(self) -> bool:
"""
return True

def synthetic_duration(self) -> float | None:
"""
The duration in seconds for the Run phase when using time-based mode,
or None if using request-count mode.
"""
return None


class VectorWorkloadSequence(ABC):
@abstractmethod
Expand Down
97 changes: 76 additions & 21 deletions vsb/workloads/synthetic_workload/synthetic_workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, name: str, options):
self.queries = None
self._record_count = options.synthetic_records
self._request_count = options.synthetic_requests
self._synthetic_duration = getattr(options, "synthetic_duration", None)
self._num_workers = options.expect_workers
self._num_users = options.num_users

Expand Down Expand Up @@ -104,8 +105,26 @@ def get_query_iter(
f"this shouldn't happen."
)
self.queries = self.setup_queries()
# Worker queries are worker-local - that is, each worker will only
# have the max potential queries for its users.

if self._synthetic_duration is not None:
# Duration mode: cycle through all queries indefinitely.
# RunUser will stop consuming when the deadline expires.
total_queries = self.queries.shape[0]

def make_cycling_query_iter():
while True:
for index in range(total_queries):
query = {
"values": self.queries["values"].iat[index],
"top_k": self.queries["top_k"].iat[index],
"neighbors": self.queries["neighbors"].iat[index],
}
yield "", SearchRequest(**query)

return make_cycling_query_iter()

# Count mode: worker queries are worker-local - that is, each worker
# will only have the max potential queries for its users.
quotient, remainder = divmod(self._request_count, num_users)
chunks = [quotient + (1 if r < remainder else 0) for r in range(num_users)]
user_q, user_r = divmod(self._num_users, self._num_workers)
Expand Down Expand Up @@ -152,8 +171,13 @@ def record_count(self) -> int:
return self._record_count

def request_count(self) -> int:
if self._synthetic_duration is not None:
return 0
return self._request_count

def synthetic_duration(self) -> float | None:
return self._synthetic_duration


class SyntheticWorkload(InMemoryWorkload, ABC):
"""A workload in which records and queries are generated pseudo-randomly."""
Expand Down Expand Up @@ -212,13 +236,14 @@ def get_random_vector(self) -> Vector:
def get_random_query_idx(self, num_idxs: int) -> int:
# Pick a random record from our records to use as a query,
# based on the query distribution.
num_records = self._record_count
match self._query_distribution:
case "uniform":
return self.rng.integers(0, self._request_count, num_idxs)
return self.rng.integers(0, num_records, num_idxs)
case "zipfian":
idxs = []
while len(idxs) < num_idxs:
if (offset := self.rng.zipf(1.1)) < self._request_count:
if (offset := self.rng.zipf(1.1)) < num_records:
idxs.append(offset)
return idxs
case _:
Expand All @@ -240,20 +265,28 @@ def setup_records(self):
def setup_queries(self):
# Pseudo-randomly generate the full RecordList of queries
# Query will be generated with the same distribution as records
max_users_per_worker = int(np.ceil(self._num_users / self._num_workers))
# Give each worker a number of queries proportional to the maximum number of users
# any worker can have.
max_queries_per_worker = (
int(np.ceil(self._request_count * max_users_per_worker / self._num_users))
+ self._num_users
)
if self._synthetic_duration is not None:
# Duration mode: generate a fixed pool of queries to cycle through
query_pool_size = min(self._record_count, 10000)
else:
max_users_per_worker = int(np.ceil(self._num_users / self._num_workers))
# Give each worker a number of queries proportional to the maximum
# number of users any worker can have.
query_pool_size = (
int(
np.ceil(
self._request_count * max_users_per_worker / self._num_users
)
)
+ self._num_users
)
self.queries = pandas.DataFrame(
{
"values": [
self.records["values"].iat[i]
for i in self.get_random_query_idx(max_queries_per_worker)
for i in self.get_random_query_idx(query_pool_size)
],
"top_k": np.full(max_queries_per_worker, self._top_k),
"top_k": np.full(query_pool_size, self._top_k),
}
)

Expand Down Expand Up @@ -597,6 +630,7 @@ def __init__(
super().__init__(name, options)
self._record_count = options.synthetic_records
self._request_count = options.synthetic_requests
self._synthetic_duration = getattr(options, "synthetic_duration", None)
self._dimensions = options.synthetic_dimensions
self._metric = DistanceMetric(options.synthetic_metric)
self._metadata_gen = self.parse_synthetic_metadata_template(
Expand Down Expand Up @@ -747,14 +781,22 @@ def make_record_iter(num_records, insert_index):
def get_query_iter(
self, num_users: int, user_id: int, batch_size: int
) -> Iterator[tuple[str, QueryRequest]]:
user_n_queries = self._request_count // num_users + (
user_id < self._request_count % num_users
)
user_n_records = self._record_count // num_users + (
user_id < self._record_count % num_users
)
# User-unique upsert id range to avoid conflicts
insert_index = self._record_count + user_id * (user_n_queries + 1)

if self._synthetic_duration is not None:
# Duration mode: generate requests indefinitely.
# RunUser will stop consuming when the deadline expires.
user_n_queries = None
insert_index = self._record_count + user_id * 1000000
else:
user_n_queries = self._request_count // num_users + (
user_id < self._request_count % num_users
)
# User-unique upsert id range to avoid conflicts
insert_index = self._record_count + user_id * (user_n_queries + 1)

# User-unique delete/fetch id range to avoid conflicts
original_index_start = self._record_count // num_users * user_id + (
min(self._record_count % num_users, user_id)
Expand All @@ -770,9 +812,13 @@ def get_query_iter(
def make_query_iter(num_queries, insert_index, available_indexes):
# Generate queries in batches. These batches will be homogenous, but a
# single query iter may contain multiple types of queries.
for query_num in range(0, num_queries, self._batch_size):
# In case num_queries is not a multiple of batch_size
curr_batch_size = min(self._batch_size, num_queries - query_num)
query_num = 0
while num_queries is None or query_num < num_queries:
curr_batch_size = (
self._batch_size
if num_queries is None
else min(self._batch_size, num_queries - query_num)
)
upsert_batch_size = min(curr_batch_size, batch_size)
# Choose a random request type based on proportions, and
# do _batch_size requests of that type
Expand Down Expand Up @@ -871,9 +917,18 @@ def make_query_iter(num_queries, insert_index, available_indexes):
fetch_ids = [str(available_indexes[i][0]) for i in idxs]

yield "", FetchRequest(ids=fetch_ids)
query_num += self._batch_size

return make_query_iter(user_n_queries, insert_index, available_indexes)

def request_count(self) -> int:
if self._synthetic_duration is not None:
return 0
return self._request_count

def synthetic_duration(self) -> float | None:
return self._synthetic_duration

def recall_available(self) -> bool:
return False

Expand Down