From 0d4b5834adfee835277db4edfd1547c678de9a62 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 8 Aug 2024 17:53:02 +0000 Subject: [PATCH 1/4] first commit --- jetstream/core/metrics/prometheus.py | 9 +++++++++ jetstream/core/orchestrator.py | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/jetstream/core/metrics/prometheus.py b/jetstream/core/metrics/prometheus.py index dc8a00e9..06807e0d 100644 --- a/jetstream/core/metrics/prometheus.py +++ b/jetstream/core/metrics/prometheus.py @@ -255,3 +255,12 @@ def get_request_output_length(self): def get_request_success_count_metric(self): return self._request_success_count.labels(id=self._id) + + _total_tokens_in_current_batch = Gauge( + name="jetstream_total_tokens_in_current_batch", + documentation="Total number of tokens in the decode batch", + labelnames=["id", "idx"], + ) + + def get_total_tokens_in_current_batch_metric(self, idx: int): + return self._total_tokens_in_current_batch.labels(id=self._id, idx=idx) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index cefabd05..b9f43cc4 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -815,6 +815,7 @@ def _detokenize_thread(self, idx: int): # is a result tokens, and we can't annotate the tuple. result_tokens = result_tokens.convert_to_numpy() + total_tokens_in_batch = 0 for slot, request in my_live_requests.items(): if request is not None: results, complete = token_utils.process_result_tokens( @@ -826,6 +827,9 @@ def _detokenize_thread(self, idx: int): complete=request.complete, ) request.complete = complete + total_tokens_in_batch += result_tokens.get_result_at_slot( + slot + ).lengths # Return some output samples. request.enqueue_samples(results) if request.complete.all(): @@ -873,6 +877,9 @@ def _detokenize_thread(self, idx: int): generate_timestep_added, (time.time() - start_detokenize_time) * 10**3, ) + self._metrics_collector.get_total_tokens_in_current_batch_metric().set( + total_tokens_in_batch + ) else: # We want to update a slot with the new channel. slot, active_request = data From 9748a00c43416f6df983a9ceabf724638d960aaa Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 8 Aug 2024 17:57:36 +0000 Subject: [PATCH 2/4] missing idx --- jetstream/core/orchestrator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index b9f43cc4..0f00d955 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -877,7 +877,7 @@ def _detokenize_thread(self, idx: int): generate_timestep_added, (time.time() - start_detokenize_time) * 10**3, ) - self._metrics_collector.get_total_tokens_in_current_batch_metric().set( + self._metrics_collector.get_total_tokens_in_current_batch_metric(idx=idx).set( total_tokens_in_batch ) else: From 0b109cee81b73b447e993ac26000b27ada3198f4 Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 8 Aug 2024 18:00:31 +0000 Subject: [PATCH 3/4] temporary missing if statemetn --- jetstream/core/orchestrator.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index 0f00d955..dc637b52 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -877,9 +877,10 @@ def _detokenize_thread(self, idx: int): generate_timestep_added, (time.time() - start_detokenize_time) * 10**3, ) - self._metrics_collector.get_total_tokens_in_current_batch_metric(idx=idx).set( - total_tokens_in_batch - ) + if self._metrics_collector: + self._metrics_collector.get_total_tokens_in_current_batch_metric(idx=idx).set( + total_tokens_in_batch + ) else: # We want to update a slot with the new channel. slot, active_request = data From e7190ce891af8f5b24fe3b5758495aa5ad3c49af Mon Sep 17 00:00:00 2001 From: Brendan Slabe Date: Thu, 8 Aug 2024 18:05:51 +0000 Subject: [PATCH 4/4] fmt --- jetstream/core/orchestrator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/jetstream/core/orchestrator.py b/jetstream/core/orchestrator.py index dc637b52..f54b3c79 100644 --- a/jetstream/core/orchestrator.py +++ b/jetstream/core/orchestrator.py @@ -878,9 +878,9 @@ def _detokenize_thread(self, idx: int): (time.time() - start_detokenize_time) * 10**3, ) if self._metrics_collector: - self._metrics_collector.get_total_tokens_in_current_batch_metric(idx=idx).set( - total_tokens_in_batch - ) + self._metrics_collector.get_total_tokens_in_current_batch_metric( + idx=idx + ).set(total_tokens_in_batch) else: # We want to update a slot with the new channel. slot, active_request = data