Skip to content

Commit 6d9d252

Browse files
committed
Add environment variable for genai upload hook queue size
1 parent 8297dde commit 6d9d252

File tree

5 files changed

+231
-98
lines changed

5 files changed

+231
-98
lines changed

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/__init__.py

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from __future__ import annotations
1616

17+
import logging
1718
from os import environ
1819

1920
from opentelemetry.util.genai.completion_hook import (
@@ -22,21 +23,65 @@
2223
)
2324
from opentelemetry.util.genai.environment_variables import (
2425
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
26+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT,
27+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_MAX_QUEUE_SIZE,
2528
)
2629

30+
_logger = logging.getLogger(__name__)
31+
2732

2833
def upload_completion_hook() -> CompletionHook:
34+
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
35+
if not base_path:
36+
_logger.warning(
37+
"%s is required but not set, using no-op instead",
38+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
39+
)
40+
return _NoOpCompletionHook()
41+
2942
# If fsspec is not installed the hook will be a no-op.
3043
try:
3144
# pylint: disable=import-outside-toplevel
3245
from opentelemetry.util.genai._upload.completion_hook import ( # noqa: PLC0415
46+
_DEFAULT_FORMAT,
47+
_DEFAULT_MAX_QUEUE_SIZE,
48+
_FORMATS,
3349
UploadCompletionHook,
3450
)
3551
except ImportError:
3652
return _NoOpCompletionHook()
3753

38-
base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
39-
if not base_path:
40-
return _NoOpCompletionHook()
54+
environ_max_queue_size = environ.get(
55+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_MAX_QUEUE_SIZE,
56+
_DEFAULT_MAX_QUEUE_SIZE,
57+
)
58+
try:
59+
environ_max_queue_size = int(environ_max_queue_size)
60+
except ValueError:
61+
_logger.warning(
62+
"%s is not a valid integer for %s. Defaulting to %s.",
63+
environ_max_queue_size,
64+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_MAX_QUEUE_SIZE,
65+
_DEFAULT_MAX_QUEUE_SIZE,
66+
)
67+
environ_max_queue_size = _DEFAULT_MAX_QUEUE_SIZE
68+
69+
environ_format = environ.get(
70+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT, _DEFAULT_FORMAT
71+
).lower()
72+
73+
if environ_format not in _FORMATS:
74+
_logger.warning(
75+
"%s is not a valid option for %s, should be be one of %s. Defaulting to %s.",
76+
environ_format,
77+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT,
78+
_FORMATS,
79+
_DEFAULT_FORMAT,
80+
)
81+
environ_format = _DEFAULT_FORMAT
4182

42-
return UploadCompletionHook(base_path=base_path)
83+
return UploadCompletionHook(
84+
base_path=base_path,
85+
max_queue_size=environ_max_queue_size,
86+
upload_format=environ_format,
87+
)

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from contextlib import ExitStack
2525
from dataclasses import asdict, dataclass
2626
from functools import partial
27-
from os import environ
2827
from time import time
2928
from typing import Any, Callable, Final, Literal
3029
from uuid import uuid4
@@ -36,9 +35,6 @@
3635
from opentelemetry.trace import Span
3736
from opentelemetry.util.genai import types
3837
from opentelemetry.util.genai.completion_hook import CompletionHook
39-
from opentelemetry.util.genai.environment_variables import (
40-
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT,
41-
)
4238
from opentelemetry.util.genai.utils import gen_ai_json_dump
4339

4440
GEN_AI_INPUT_MESSAGES_REF: Final = (
@@ -52,6 +48,9 @@
5248
)
5349

5450
_MESSAGE_INDEX_KEY = "index"
51+
_DEFAULT_MAX_QUEUE_SIZE = 20
52+
_DEFAULT_FORMAT = "json"
53+
5554

5655
Format = Literal["json", "jsonl"]
5756
_FORMATS: tuple[Format, ...] = ("json", "jsonl")
@@ -105,36 +104,26 @@ def __init__(
105104
self,
106105
*,
107106
base_path: str,
108-
max_size: int = 20,
109-
upload_format: Format | None = None,
107+
max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
108+
upload_format: Format = _DEFAULT_FORMAT,
110109
lru_cache_max_size: int = 1024,
111110
) -> None:
112-
self._max_size = max_size
111+
self._max_queue_size = max_queue_size
113112
self._fs, base_path = fsspec.url_to_fs(base_path)
114113
self._base_path = self._fs.unstrip_protocol(base_path)
115114
self.lru_dict: OrderedDict[str, bool] = OrderedDict()
116115
self.lru_cache_max_size = lru_cache_max_size
117116

118-
if upload_format not in _FORMATS + (None,):
117+
if upload_format not in _FORMATS:
119118
raise ValueError(
120119
f"Invalid {upload_format=}. Must be one of {_FORMATS}"
121120
)
122-
123-
if upload_format is None:
124-
environ_format = environ.get(
125-
OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT, "json"
126-
).lower()
127-
if environ_format not in _FORMATS:
128-
upload_format = "json"
129-
else:
130-
upload_format = environ_format
131-
132-
self._format: Final[Literal["json", "jsonl"]] = upload_format
121+
self._format = upload_format
133122

134123
# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
135124
# limits the number of queued tasks. If the queue is full, data will be dropped.
136-
self._executor = ThreadPoolExecutor(max_workers=max_size)
137-
self._semaphore = threading.BoundedSemaphore(max_size)
125+
self._executor = ThreadPoolExecutor(max_workers=self._max_queue_size)
126+
self._semaphore = threading.BoundedSemaphore(self._max_queue_size)
138127

139128
def _submit_all(self, upload_data: UploadData) -> None:
140129
def done(future: Future[None]) -> None:
@@ -326,7 +315,7 @@ def shutdown(self, *, timeout_sec: float = 10.0) -> None:
326315

327316
# Wait for all tasks to finish to flush the queue
328317
with ExitStack() as stack:
329-
for _ in range(self._max_size):
318+
for _ in range(self._max_queue_size):
330319
remaining = deadline - time()
331320
if not self._semaphore.acquire(timeout=remaining): # pylint: disable=consider-using-with
332321
# Couldn't finish flushing all uploads before timeout

util/opentelemetry-util-genai/src/opentelemetry/util/genai/environment_variables.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,13 @@
5050
The format to use when uploading prompt and response data. Must be one of ``json`` or
5151
``jsonl``. Defaults to ``json``.
5252
"""
53+
54+
OTEL_INSTRUMENTATION_GENAI_UPLOAD_MAX_QUEUE_SIZE = (
55+
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_MAX_QUEUE_SIZE"
56+
)
57+
"""
58+
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_MAX_QUEUE_SIZE
59+
60+
The maximum number of concurrent uploads to queue. New uploads will be dropped if the queue is
61+
full. Defaults to 20.
62+
"""

util/opentelemetry-util-genai/tests/test_upload.py

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@
1313
# limitations under the License.
1414

1515

16-
# pylint: disable=import-outside-toplevel,no-name-in-module
17-
import importlib
16+
# pylint: disable=no-name-in-module
1817
import logging
19-
import sys
2018
import threading
2119
import time
2220
from contextlib import contextmanager
@@ -33,42 +31,10 @@
3331
from opentelemetry.util.genai._upload.completion_hook import (
3432
UploadCompletionHook,
3533
)
36-
from opentelemetry.util.genai.completion_hook import (
37-
_NoOpCompletionHook,
38-
load_completion_hook,
39-
)
4034

4135
# Use MemoryFileSystem for testing
4236
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.memory.MemoryFileSystem
4337
BASE_PATH = "memory://"
44-
45-
46-
@patch.dict(
47-
"os.environ",
48-
{
49-
"OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK": "upload",
50-
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH": BASE_PATH,
51-
},
52-
clear=True,
53-
)
54-
class TestUploadEntryPoint(TestCase):
55-
def test_upload_entry_point(self):
56-
self.assertIsInstance(load_completion_hook(), UploadCompletionHook)
57-
58-
def test_upload_entry_point_no_fsspec(self):
59-
"""Tests that the a no-op uploader is used when fsspec is not installed"""
60-
61-
from opentelemetry.util.genai import _upload # noqa: PLC0415
62-
63-
# Simulate fsspec imports failing
64-
with patch.dict(
65-
sys.modules,
66-
{"opentelemetry.util.genai._upload.completion_hook": None},
67-
):
68-
importlib.reload(_upload)
69-
self.assertIsInstance(load_completion_hook(), _NoOpCompletionHook)
70-
71-
7238
MAXSIZE = 5
7339
FAKE_INPUTS = [
7440
types.InputMessage(
@@ -125,7 +91,7 @@ def setUp(self):
12591
self.mock_fs.exists.return_value = False
12692

12793
self.hook = UploadCompletionHook(
128-
base_path=BASE_PATH, max_size=MAXSIZE, lru_cache_max_size=5
94+
base_path=BASE_PATH, max_queue_size=MAXSIZE, lru_cache_max_size=5
12995
)
13096

13197
def tearDown(self) -> None:
@@ -305,42 +271,6 @@ def test_upload_format_sets_content_type(self):
305271
ANY, "w", content_type=expect_content_type
306272
)
307273

308-
def test_parse_upload_format_envvar(self):
309-
for envvar_value, expect in (
310-
("", "json"),
311-
("json", "json"),
312-
("invalid", "json"),
313-
("jsonl", "jsonl"),
314-
("jSoNl", "jsonl"),
315-
):
316-
with patch.dict(
317-
"os.environ",
318-
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": envvar_value},
319-
clear=True,
320-
):
321-
hook = UploadCompletionHook(base_path=BASE_PATH)
322-
self.addCleanup(hook.shutdown)
323-
self.assertEqual(
324-
hook._format,
325-
expect,
326-
f"expected upload format {expect=} with {envvar_value=} got {hook._format}",
327-
)
328-
329-
with patch.dict(
330-
"os.environ",
331-
{"OTEL_INSTRUMENTATION_GENAI_UPLOAD_FORMAT": "json"},
332-
clear=True,
333-
):
334-
hook = UploadCompletionHook(
335-
base_path=BASE_PATH, upload_format="jsonl"
336-
)
337-
self.addCleanup(hook.shutdown)
338-
self.assertEqual(
339-
hook._format,
340-
"jsonl",
341-
"upload_format kwarg should take precedence",
342-
)
343-
344274
def test_upload_after_shutdown_logs(self):
345275
self.hook.shutdown()
346276
with self.assertLogs(level=logging.INFO) as logs:

0 commit comments

Comments
 (0)