diff --git a/vsb/cmdline_args.py b/vsb/cmdline_args.py index 5428d5e..849de23 100644 --- a/vsb/cmdline_args.py +++ b/vsb/cmdline_args.py @@ -153,7 +153,16 @@ def add_vsb_cmdline_args( default=100, help="Number of requests to generate for the synthetic workload. For synthetic proportional " "workloads, this is the number of requests (including upserts) to run after the initial " - "population. Default is %(default)s.", + "population. Ignored when --synthetic_duration is specified. Default is %(default)s.", + ) + synthetic_group.add_argument( + "--synthetic_duration", + type=float, + default=None, + help="Duration in seconds for the Run phase of synthetic workloads. " + "When specified, requests are generated continuously until the duration expires. " + "Mutually exclusive with --synthetic_requests. Not supported with synthetic-runbook. " + "Default is 300 (5 minutes) when used.", ) synthetic_group.add_argument( "--synthetic_dimensions", @@ -521,9 +530,22 @@ def validate_parsed_args( pass match args.workload: case "synthetic" | "synthetic-proportional" | "synthetic-runbook": + # When --synthetic_duration is specified, it takes precedence over + # --synthetic_requests (which has a default of 100). + if args.synthetic_duration is not None: + args.synthetic_requests = None + # --synthetic_duration is not supported with synthetic-runbook + if ( + args.synthetic_duration is not None + and args.workload == "synthetic-runbook" + ): + parser.error( + "--synthetic_duration is not supported with the synthetic-runbook workload. " + "Please use --synthetic_requests instead." + ) + required = ( "synthetic_records", - "synthetic_requests", "synthetic_dimensions", "synthetic_metric", "synthetic_top_k", diff --git a/vsb/users.py b/vsb/users.py index 24ee12b..11455fe 100644 --- a/vsb/users.py +++ b/vsb/users.py @@ -250,6 +250,8 @@ def __init__(self, environment): f"Initialising RunUser id:{self.user_id}, target request/sec:{target_throughput}" ) self.query_iter = None + self._duration = self.workload.synthetic_duration() + self._deadline = None @task def request(self): @@ -273,6 +275,19 @@ def do_run(self): self.query_iter = self.workload.get_query_iter( self.users_total, self.user_id, batch_size ) + if self._duration is not None: + self._deadline = time.time() + self._duration + + # Duration mode: check if time has expired + if self._deadline is not None and time.time() >= self._deadline: + logger.debug( + f"User id:{self.user_id} completed Run phase (duration expired)" + ) + self.environment.runner.send_message( + "update_progress", {"user": self.user_id, "phase": "run"} + ) + self.state = RunUser.State.Done + return tenant: str = None request: QueryRequest = None @@ -793,23 +808,37 @@ def get_recall_pct(p): + f"[magenta]Recall: {recall_str}" ) - # If --synthetic-no-aggregate-stats is set, the cumulative request - # count is stored in the stats[workload.name] object. We just use - # the total request count for the entire runbook as the total, - # although it breaks convention with non-runbook workloads. - if ( - isinstance( - env.workload_sequence, - vsb.workloads.synthetic_workload.synthetic_workload.SyntheticRunbook, + # Determine progress bar mode: time-based or count-based + duration = workload.synthetic_duration() + if duration is not None: + # Duration mode: show elapsed time vs total duration + elapsed = time.time() - search_stats.start_time + vsb.progress.update( + self.progress_task_id, + completed=int(min(elapsed, duration)), + total=int(duration), + extra_info=metrics_str, ) - and not self.no_aggregate_stats - ): - total = env.workload_sequence.request_count() else: - total = self.request_count - vsb.progress.update( - self.progress_task_id, - completed=cumulative_num_requests, - total=total, - extra_info=metrics_str, - ) + # Count mode: show completed requests vs total + # If --synthetic-no-aggregate-stats is set, the cumulative + # request count is stored in the stats[workload.name] + # object. We just use the total request count for the + # entire runbook as the total, although it breaks + # convention with non-runbook workloads. + if ( + isinstance( + env.workload_sequence, + vsb.workloads.synthetic_workload.synthetic_workload.SyntheticRunbook, + ) + and not self.no_aggregate_stats + ): + total = env.workload_sequence.request_count() + else: + total = self.request_count + vsb.progress.update( + self.progress_task_id, + completed=cumulative_num_requests, + total=total, + extra_info=metrics_str, + ) diff --git a/vsb/workloads/base.py b/vsb/workloads/base.py index b609ce0..6b7d07a 100644 --- a/vsb/workloads/base.py +++ b/vsb/workloads/base.py @@ -101,6 +101,13 @@ def recall_available(self) -> bool: """ return True + def synthetic_duration(self) -> float | None: + """ + The duration in seconds for the Run phase when using time-based mode, + or None if using request-count mode. + """ + return None + class VectorWorkloadSequence(ABC): @abstractmethod diff --git a/vsb/workloads/synthetic_workload/synthetic_workload.py b/vsb/workloads/synthetic_workload/synthetic_workload.py index 9cfc183..ec89dca 100644 --- a/vsb/workloads/synthetic_workload/synthetic_workload.py +++ b/vsb/workloads/synthetic_workload/synthetic_workload.py @@ -47,6 +47,7 @@ def __init__(self, name: str, options): self.queries = None self._record_count = options.synthetic_records self._request_count = options.synthetic_requests + self._synthetic_duration = getattr(options, "synthetic_duration", None) self._num_workers = options.expect_workers self._num_users = options.num_users @@ -104,8 +105,26 @@ def get_query_iter( f"this shouldn't happen." ) self.queries = self.setup_queries() - # Worker queries are worker-local - that is, each worker will only - # have the max potential queries for its users. + + if self._synthetic_duration is not None: + # Duration mode: cycle through all queries indefinitely. + # RunUser will stop consuming when the deadline expires. + total_queries = self.queries.shape[0] + + def make_cycling_query_iter(): + while True: + for index in range(total_queries): + query = { + "values": self.queries["values"].iat[index], + "top_k": self.queries["top_k"].iat[index], + "neighbors": self.queries["neighbors"].iat[index], + } + yield "", SearchRequest(**query) + + return make_cycling_query_iter() + + # Count mode: worker queries are worker-local - that is, each worker + # will only have the max potential queries for its users. quotient, remainder = divmod(self._request_count, num_users) chunks = [quotient + (1 if r < remainder else 0) for r in range(num_users)] user_q, user_r = divmod(self._num_users, self._num_workers) @@ -152,8 +171,13 @@ def record_count(self) -> int: return self._record_count def request_count(self) -> int: + if self._synthetic_duration is not None: + return 0 return self._request_count + def synthetic_duration(self) -> float | None: + return self._synthetic_duration + class SyntheticWorkload(InMemoryWorkload, ABC): """A workload in which records and queries are generated pseudo-randomly.""" @@ -212,13 +236,14 @@ def get_random_vector(self) -> Vector: def get_random_query_idx(self, num_idxs: int) -> int: # Pick a random record from our records to use as a query, # based on the query distribution. + num_records = self._record_count match self._query_distribution: case "uniform": - return self.rng.integers(0, self._request_count, num_idxs) + return self.rng.integers(0, num_records, num_idxs) case "zipfian": idxs = [] while len(idxs) < num_idxs: - if (offset := self.rng.zipf(1.1)) < self._request_count: + if (offset := self.rng.zipf(1.1)) < num_records: idxs.append(offset) return idxs case _: @@ -240,20 +265,28 @@ def setup_records(self): def setup_queries(self): # Pseudo-randomly generate the full RecordList of queries # Query will be generated with the same distribution as records - max_users_per_worker = int(np.ceil(self._num_users / self._num_workers)) - # Give each worker a number of queries proportional to the maximum number of users - # any worker can have. - max_queries_per_worker = ( - int(np.ceil(self._request_count * max_users_per_worker / self._num_users)) - + self._num_users - ) + if self._synthetic_duration is not None: + # Duration mode: generate a fixed pool of queries to cycle through + query_pool_size = min(self._record_count, 10000) + else: + max_users_per_worker = int(np.ceil(self._num_users / self._num_workers)) + # Give each worker a number of queries proportional to the maximum + # number of users any worker can have. + query_pool_size = ( + int( + np.ceil( + self._request_count * max_users_per_worker / self._num_users + ) + ) + + self._num_users + ) self.queries = pandas.DataFrame( { "values": [ self.records["values"].iat[i] - for i in self.get_random_query_idx(max_queries_per_worker) + for i in self.get_random_query_idx(query_pool_size) ], - "top_k": np.full(max_queries_per_worker, self._top_k), + "top_k": np.full(query_pool_size, self._top_k), } ) @@ -597,6 +630,7 @@ def __init__( super().__init__(name, options) self._record_count = options.synthetic_records self._request_count = options.synthetic_requests + self._synthetic_duration = getattr(options, "synthetic_duration", None) self._dimensions = options.synthetic_dimensions self._metric = DistanceMetric(options.synthetic_metric) self._metadata_gen = self.parse_synthetic_metadata_template( @@ -747,14 +781,22 @@ def make_record_iter(num_records, insert_index): def get_query_iter( self, num_users: int, user_id: int, batch_size: int ) -> Iterator[tuple[str, QueryRequest]]: - user_n_queries = self._request_count // num_users + ( - user_id < self._request_count % num_users - ) user_n_records = self._record_count // num_users + ( user_id < self._record_count % num_users ) - # User-unique upsert id range to avoid conflicts - insert_index = self._record_count + user_id * (user_n_queries + 1) + + if self._synthetic_duration is not None: + # Duration mode: generate requests indefinitely. + # RunUser will stop consuming when the deadline expires. + user_n_queries = None + insert_index = self._record_count + user_id * 1000000 + else: + user_n_queries = self._request_count // num_users + ( + user_id < self._request_count % num_users + ) + # User-unique upsert id range to avoid conflicts + insert_index = self._record_count + user_id * (user_n_queries + 1) + # User-unique delete/fetch id range to avoid conflicts original_index_start = self._record_count // num_users * user_id + ( min(self._record_count % num_users, user_id) @@ -770,9 +812,13 @@ def get_query_iter( def make_query_iter(num_queries, insert_index, available_indexes): # Generate queries in batches. These batches will be homogenous, but a # single query iter may contain multiple types of queries. - for query_num in range(0, num_queries, self._batch_size): - # In case num_queries is not a multiple of batch_size - curr_batch_size = min(self._batch_size, num_queries - query_num) + query_num = 0 + while num_queries is None or query_num < num_queries: + curr_batch_size = ( + self._batch_size + if num_queries is None + else min(self._batch_size, num_queries - query_num) + ) upsert_batch_size = min(curr_batch_size, batch_size) # Choose a random request type based on proportions, and # do _batch_size requests of that type @@ -871,9 +917,18 @@ def make_query_iter(num_queries, insert_index, available_indexes): fetch_ids = [str(available_indexes[i][0]) for i in idxs] yield "", FetchRequest(ids=fetch_ids) + query_num += self._batch_size return make_query_iter(user_n_queries, insert_index, available_indexes) + def request_count(self) -> int: + if self._synthetic_duration is not None: + return 0 + return self._request_count + + def synthetic_duration(self) -> float | None: + return self._synthetic_duration + def recall_available(self) -> bool: return False