Skip to content

chore(llmobs): dac strip io from OpenAI #13791

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 1 addition & 82 deletions ddtrace/contrib/internal/openai/_endpoint_hooks.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The base _EndpointHook class has its own _record_request method which does some request specific tagging on the APM span. Are we ok leaving all of those tags on the APM span? For other providers, I do not think we have any of this information (besides the model and provider), so it would be more consistent to remove this tagging; however, is the idea to keep it because we do not have this information on the LLMObs span?

I also noticed that it seems like we tag the provider as "openai.request.client" here which seems inconsistent with other integrations where we refer to this as the provider.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from ddtrace.contrib.internal.openai.utils import _is_generator
from ddtrace.contrib.internal.openai.utils import _loop_handler
from ddtrace.contrib.internal.openai.utils import _process_finished_stream
from ddtrace.contrib.internal.openai.utils import _tag_tool_calls
from ddtrace.internal.utils.version import parse_version


Expand Down Expand Up @@ -166,19 +165,7 @@ class _CompletionHook(_BaseCompletionHook):
"model",
"engine",
"suffix",
"max_tokens",
"temperature",
"top_p",
"n",
"stream",
"logprobs",
"echo",
Comment on lines 166 to 168
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind describing why we are leaving these parameters only? I guess echo seems to be related to audio models only which seems fine to leave but what about engine and suffix? I am a bit confused as to what engine is referring to as I do not see it on the list of request arguments in the Open AI API docs.

"stop",
"presence_penalty",
"frequency_penalty",
"best_of",
"logit_bias",
"user",
)
_response_attrs = ("created", "id", "model")
ENDPOINT_NAME = "completions"
Expand All @@ -187,12 +174,6 @@ class _CompletionHook(_BaseCompletionHook):

def _record_request(self, pin, integration, instance, span, args, kwargs):
super()._record_request(pin, integration, instance, span, args, kwargs)
Comment on lines 175 to 176
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need these lines? If we remove this, won't the base class's _record_request method be called automatically (possibly other examples below).

if integration.is_pc_sampled_span(span):
prompt = kwargs.get("prompt", "")
if isinstance(prompt, str):
prompt = [prompt]
for idx, p in enumerate(prompt):
span.set_tag_str("openai.request.prompt.%d" % idx, integration.trunc(str(p)))

def _record_response(self, pin, integration, span, args, kwargs, resp, error):
resp = super()._record_response(pin, integration, span, args, kwargs, resp, error)
Expand All @@ -204,11 +185,6 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="completion")
if not resp:
return
for choice in resp.choices:
span.set_tag_str("openai.response.choices.%d.finish_reason" % choice.index, str(choice.finish_reason))
if integration.is_pc_sampled_span(span):
span.set_tag_str("openai.response.choices.%d.text" % choice.index, integration.trunc(choice.text))
integration.record_usage(span, resp.usage)
return resp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just me or is the logic here extremely convoluted 🤣 There are two separate conditional checks for if not resp. I know this isn't in scope of this PR but I wonder if we can refactor this a bit to make this more readable while we're already working on this part of the code! Lmk if you think it makes sense to do this in a different PR though.



Expand All @@ -221,16 +197,6 @@ class _ChatCompletionHook(_BaseCompletionHook):
_request_kwarg_params = (
"model",
"engine",
"temperature",
"top_p",
"n",
"stream",
"stop",
"max_tokens",
"presence_penalty",
"frequency_penalty",
"logit_bias",
"user",
)
_response_attrs = ("created", "id", "model")
ENDPOINT_NAME = "chat/completions"
Expand All @@ -239,18 +205,6 @@ class _ChatCompletionHook(_BaseCompletionHook):

def _record_request(self, pin, integration, instance, span, args, kwargs):
super()._record_request(pin, integration, instance, span, args, kwargs)
for idx, m in enumerate(kwargs.get("messages", [])):
role = getattr(m, "role", "")
name = getattr(m, "name", "")
content = getattr(m, "content", "")
if isinstance(m, dict):
content = m.get("content", "")
role = m.get("role", "")
name = m.get("name", "")
if integration.is_pc_sampled_span(span):
span.set_tag_str("openai.request.messages.%d.content" % idx, integration.trunc(str(content)))
span.set_tag_str("openai.request.messages.%d.role" % idx, str(role))
span.set_tag_str("openai.request.messages.%d.name" % idx, str(name))
if parse_version(OPENAI_VERSION) >= (1, 26) and kwargs.get("stream"):
stream_options = kwargs.get("stream_options", {})
if not isinstance(stream_options, dict):
Expand All @@ -270,21 +224,6 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
if kwargs.get("stream") and error is None:
return self._handle_streamed_response(integration, span, kwargs, resp, operation_type="chat")
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="chat")
for choice in resp.choices:
idx = choice.index
finish_reason = getattr(choice, "finish_reason", None)
message = choice.message
span.set_tag_str("openai.response.choices.%d.finish_reason" % idx, str(finish_reason))
span.set_tag_str("openai.response.choices.%d.message.role" % idx, choice.message.role)
if integration.is_pc_sampled_span(span):
span.set_tag_str(
"openai.response.choices.%d.message.content" % idx, integration.trunc(message.content or "")
)
if getattr(message, "function_call", None):
_tag_tool_calls(integration, span, [message.function_call], idx)
if getattr(message, "tool_calls", None):
_tag_tool_calls(integration, span, message.tool_calls, idx)
integration.record_usage(span, resp.usage)
return resp


Expand All @@ -294,7 +233,7 @@ class _ChatCompletionWithRawResponseHook(_ChatCompletionHook):

class _EmbeddingHook(_EndpointHook):
_request_arg_params = ("api_key", "api_base", "api_type", "request_id", "api_version", "organization")
_request_kwarg_params = ("model", "engine", "user")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove user here?

_request_kwarg_params = ("model", "engine")
_response_attrs = ("model",)
ENDPOINT_NAME = "embeddings"
HTTP_METHOD_TYPE = "POST"
Expand All @@ -306,21 +245,12 @@ def _record_request(self, pin, integration, instance, span, args, kwargs):
manually set them in _pre_response().
"""
super()._record_request(pin, integration, instance, span, args, kwargs)
embedding_input = kwargs.get("input", "")
if integration.is_pc_sampled_span(span):
if isinstance(embedding_input, str) or isinstance(embedding_input[0], int):
embedding_input = [embedding_input]
for idx, inp in enumerate(embedding_input):
span.set_tag_str("openai.request.input.%d" % idx, integration.trunc(str(inp)))

def _record_response(self, pin, integration, span, args, kwargs, resp, error):
resp = super()._record_response(pin, integration, span, args, kwargs, resp, error)
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="embedding")
if not resp:
return
span.set_metric("openai.response.embeddings_count", len(resp.data))
span.set_metric("openai.response.embedding-length", len(resp.data[0].embedding))
integration.record_usage(span, resp.usage)
return resp


Expand Down Expand Up @@ -729,20 +659,10 @@ class _ResponseHook(_BaseCompletionHook):
_request_kwarg_params = (
"model",
"include",
"instructions",
"max_output_tokens",
"metadata",
"parallel_tool_calls",
"previous_response_id",
"reasoning",
"service_tier",
"store",
"stream",
"temperature",
"text",
"tool_choice",
"tools",
"top_p",
"truncation",
"user",
)
Expand All @@ -759,5 +679,4 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
if kwargs.get("stream") and error is None:
return self._handle_streamed_response(integration, span, kwargs, resp, operation_type="response")
integration.llmobs_set_tags(span, args=[], kwargs=kwargs, response=resp, operation="response")
integration.record_usage(span, resp.usage)
return resp
68 changes: 13 additions & 55 deletions ddtrace/contrib/internal/openai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,39 +301,17 @@ def _process_finished_stream(integration, span, kwargs, streamed_chunks, operati
formatted_completions = [
openai_construct_message_from_streamed_chunks(choice) for choice in streamed_chunks
]
if integration.is_pc_sampled_span(span) and not operation_type == "response":
_tag_streamed_completions(integration, span, formatted_completions)
_set_token_metrics_from_streamed_response(span, formatted_completions, prompts, request_messages, kwargs)
_set_token_metrics_from_streamed_response(
span, integration, formatted_completions, prompts, request_messages, kwargs
)
integration.llmobs_set_tags(
span, args=[], kwargs=kwargs, response=formatted_completions, operation=operation_type
)
except Exception:
log.warning("Error processing streamed completion/chat response.", exc_info=True)


def _tag_streamed_completions(integration, span, completions_or_messages=None):
"""Tagging logic for streamed completions and chat completions."""
for idx, choice in enumerate(completions_or_messages):
text = choice.get("text", "")
if text:
span.set_tag_str("openai.response.choices.%d.text" % idx, integration.trunc(str(text)))
message_role = choice.get("role", "")
if message_role:
span.set_tag_str("openai.response.choices.%d.message.role" % idx, str(message_role))
message_content = choice.get("content", "")
if message_content:
span.set_tag_str(
"openai.response.choices.%d.message.content" % idx, integration.trunc(str(message_content))
)
tool_calls = choice.get("tool_calls", [])
if tool_calls:
_tag_tool_calls(integration, span, tool_calls, idx)
finish_reason = choice.get("finish_reason", "")
if finish_reason:
span.set_tag_str("openai.response.choices.%d.finish_reason" % idx, str(finish_reason))


def _set_token_metrics_from_streamed_response(span, response, prompts, messages, kwargs):
def _set_token_metrics_from_streamed_response(span, integration, response, prompts, messages, kwargs):
"""Set token span metrics on streamed chat/completion/response.
If token usage is not available in the response, compute/estimate the token counts.
"""
Expand All @@ -355,11 +333,15 @@ def _set_token_metrics_from_streamed_response(span, response, prompts, messages,
estimated, prompt_tokens = _compute_prompt_tokens(model_name, prompts, messages)
estimated, completion_tokens = _compute_completion_tokens(response, model_name)
total_tokens = prompt_tokens + completion_tokens
span.set_metric("openai.response.usage.prompt_tokens", prompt_tokens)
span.set_metric("openai.request.prompt_tokens_estimated", int(estimated))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The estimated variable is no longer being used, was this intentional? is there any downstream impact of this? Might we worth checking with @Yun-Kim who might know more about what this is used for if anything!

span.set_metric("openai.response.usage.completion_tokens", completion_tokens)
span.set_metric("openai.response.completion_tokens_estimated", int(estimated))
span.set_metric("openai.response.usage.total_tokens", total_tokens)

integration.llmobs_record_usage(
span,
{
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
},
)


def _compute_prompt_tokens(model_name, prompts=None, messages=None):
Expand Down Expand Up @@ -392,27 +374,3 @@ def _compute_completion_tokens(completions_or_messages, model_name):
estimated, completion_tokens = _compute_token_count(content, model_name)
num_completion_tokens += completion_tokens
return estimated, num_completion_tokens


def _tag_tool_calls(integration, span, tool_calls, choice_idx):
# type: (...) -> None
"""
Tagging logic if function_call or tool_calls are provided in the chat response.
Notes:
- since function calls are deprecated and will be replaced with tool calls, apply the same tagging logic/schema.
- streamed responses are processed and collected as dictionaries rather than objects,
so we need to handle both ways of accessing values.
"""
for idy, tool_call in enumerate(tool_calls):
if hasattr(tool_call, "function"):
# tool_call is further nested in a "function" object
tool_call = tool_call.function
function_arguments = _get_attr(tool_call, "arguments", "")
function_name = _get_attr(tool_call, "name", "")
span.set_tag_str(
"openai.response.choices.%d.message.tool_calls.%d.arguments" % (choice_idx, idy),
integration.trunc(str(function_arguments)),
)
span.set_tag_str(
"openai.response.choices.%d.message.tool_calls.%d.name" % (choice_idx, idy), str(function_name)
)
34 changes: 24 additions & 10 deletions ddtrace/llmobs/_integrations/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TOTAL_TOKENS_METRIC_KEY
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags
from ddtrace.llmobs._integrations.utils import openai_set_meta_tags_from_chat
from ddtrace.llmobs._integrations.utils import openai_set_meta_tags_from_completion
from ddtrace.llmobs._integrations.utils import openai_set_meta_tags_from_response
Expand Down Expand Up @@ -96,14 +95,29 @@ def _is_provider(span, provider):
return False
return provider.lower() in base_url.lower()

def record_usage(self, span: Span, usage: Dict[str, Any]) -> None:
def llmobs_record_usage(self, span: Span, usage: Dict[str, Any]) -> None:
if not usage:
return
for token_type in ("prompt", "completion", "output", "input", "total"):
num_tokens = getattr(usage, token_type + "_tokens", None)
if not num_tokens:
continue
span.set_metric("openai.response.usage.%s_tokens" % token_type, num_tokens)

prompt_tokens = _get_attr(usage, "prompt_tokens", 0)
completion_tokens = _get_attr(usage, "completion_tokens", 0)
input_tokens = _get_attr(usage, "input_tokens", 0)
output_tokens = _get_attr(usage, "output_tokens", 0)

input_tokens = prompt_tokens or input_tokens
output_tokens = completion_tokens or output_tokens

token_metrics = {
INPUT_TOKENS_METRIC_KEY: input_tokens,
OUTPUT_TOKENS_METRIC_KEY: output_tokens,
TOTAL_TOKENS_METRIC_KEY: input_tokens + output_tokens,
}

span._set_ctx_items(
{
METRICS: token_metrics,
}
)

def _llmobs_set_tags(
self,
Expand Down Expand Up @@ -133,7 +147,7 @@ def _llmobs_set_tags(
elif operation == "response":
openai_set_meta_tags_from_response(span, kwargs, response)
update_proxy_workflow_input_output_value(span, span_kind)
metrics = self._extract_llmobs_metrics_tags(span, response, span_kind)
metrics = self._extract_llmobs_metrics_tags(span, response, span_kind) or span._get_ctx_item(METRICS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you confirm my understanding -- if the response is streamed, we expect the metrics to be on the span context and if the response is not streamed, then we need to extract the token usage from the response itself?

span._set_ctx_items(
{SPAN_KIND: span_kind, MODEL_NAME: model_name or "", MODEL_PROVIDER: model_provider, METRICS: metrics}
)
Expand Down Expand Up @@ -164,7 +178,7 @@ def _llmobs_set_meta_tags_from_embedding(span: Span, kwargs: Dict[str, Any], res
span._set_ctx_item(OUTPUT_VALUE, "[{} embedding(s) returned]".format(len(resp.data)))

@staticmethod
def _extract_llmobs_metrics_tags(span: Span, resp: Any, span_kind: str) -> Dict[str, Any]:
def _extract_llmobs_metrics_tags(span: Span, resp: Any, span_kind: str) -> Optional[Dict[str, Any]]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 Code Quality Violation

do not use Any, use a concrete type (...read more)

Use the Any type very carefully. Most of the time, the Any type is used because we do not know exactly what type is being used. If you want to specify that a value can be of any type, use object instead of Any.

Learn More

View in Datadog  Leave us feedback  Documentation

"""Extract metrics from a chat/completion and set them as a temporary "_ml_obs.metrics" tag."""
token_usage = _get_attr(resp, "usage", None)
if token_usage is not None and span_kind != "workflow":
Expand All @@ -181,7 +195,7 @@ def _extract_llmobs_metrics_tags(span: Span, resp: Any, span_kind: str) -> Dict[
OUTPUT_TOKENS_METRIC_KEY: output_tokens,
TOTAL_TOKENS_METRIC_KEY: input_tokens + output_tokens,
}
return get_llmobs_metrics_tags("openai", span)
return None

def _get_base_url(self, **kwargs: Dict[str, Any]) -> Optional[str]:
instance = kwargs.get("instance")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
other:
- |
openai: Removes the IO data from the APM spans for OpenAI LLM requests and responses, which is duplicated in the LLM Observability span.
Loading
Loading