Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/my-website/docs/pass_through/anthropic_completion.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Pass-through endpoints for Anthropic - call provider-specific endpoint, in nativ

| Feature | Supported | Notes |
|-------|-------|-------|
| Cost Tracking | ✅ | supports all models on `/messages` endpoint |
| Cost Tracking | ✅ | supports all models on `/messages`, `/v1/messages/batches` endpoint |
| Logging | ✅ | works across all integrations |
| End-user Tracking | ✅ | disable prometheus tracking via `litellm.disable_end_user_cost_tracking_prometheus_only`|
| Streaming | ✅ | |
Expand Down Expand Up @@ -263,6 +263,19 @@ curl https://api.anthropic.com/v1/messages/batches \
}'
```

:::note Configuration Required for Batch Cost Tracking
For batch passthrough cost tracking to work properly, you need to define the Anthropic model in your `proxy_config.yaml`:

```yaml
model_list:
- model_name: claude-sonnet-4-5-20250929 # or any alias
litellm_params:
model: anthropic/claude-sonnet-4-5-20250929
api_key: os.environ/ANTHROPIC_API_KEY
```

This ensures the polling mechanism can correctly identify the provider and retrieve batch status for cost calculation.
:::

## Advanced

Expand Down
14 changes: 7 additions & 7 deletions litellm/batches/batch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

async def calculate_batch_cost_and_usage(
file_content_dictionary: List[dict],
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"],
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"],
model_name: Optional[str] = None,
) -> Tuple[float, Usage, List[str]]:
"""
Expand All @@ -37,7 +37,7 @@ async def calculate_batch_cost_and_usage(

async def _handle_completed_batch(
batch: Batch,
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"],
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"],
model_name: Optional[str] = None,
) -> Tuple[float, Usage, List[str]]:
"""Helper function to process a completed batch and handle logging"""
Expand Down Expand Up @@ -84,7 +84,7 @@ def _get_batch_models_from_file_content(

def _batch_cost_calculator(
file_content_dictionary: List[dict],
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
model_name: Optional[str] = None,
) -> float:
"""
Expand Down Expand Up @@ -186,7 +186,7 @@ def calculate_vertex_ai_batch_cost_and_usage(

async def _get_batch_output_file_content_as_dictionary(
batch: Batch,
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
) -> List[dict]:
"""
Get the batch output file content as a list of dictionaries
Expand Down Expand Up @@ -225,7 +225,7 @@ def _get_file_content_as_dictionary(file_content: bytes) -> List[dict]:

def _get_batch_job_cost_from_file_content(
file_content_dictionary: List[dict],
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
) -> float:
"""
Get the cost of a batch job from the file content
Expand Down Expand Up @@ -253,7 +253,7 @@ def _get_batch_job_cost_from_file_content(

def _get_batch_job_total_usage_from_file_content(
file_content_dictionary: List[dict],
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
model_name: Optional[str] = None,
) -> Usage:
"""
Expand Down Expand Up @@ -332,4 +332,4 @@ def _batch_response_was_successful(batch_job_output_file: dict) -> bool:
Check if the batch job response status == 200
"""
_response: dict = batch_job_output_file.get("response", None) or {}
return _response.get("status_code", None) == 200
return _response.get("status_code", None) == 200
31 changes: 27 additions & 4 deletions litellm/batches/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import litellm
from litellm._logging import verbose_logger
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.anthropic.batches.handler import AnthropicBatchesHandler
from litellm.llms.azure.batches.handler import AzureBatchesAPI
from litellm.llms.bedrock.batches.handler import BedrockBatchesHandler
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
Expand Down Expand Up @@ -53,6 +54,7 @@
openai_batches_instance = OpenAIBatchesAPI()
azure_batches_instance = AzureBatchesAPI()
vertex_ai_batches_instance = VertexAIBatchPrediction(gcs_bucket_name="")
anthropic_batches_instance = AnthropicBatchesHandler()
base_llm_http_handler = BaseLLMHTTPHandler()
#################################################

Expand Down Expand Up @@ -355,7 +357,7 @@ def create_batch(
@client
async def aretrieve_batch(
batch_id: str,
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
metadata: Optional[Dict[str, str]] = None,
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -401,7 +403,7 @@ def _handle_retrieve_batch_providers_without_provider_config(
litellm_params: dict,
_retrieve_batch_request: RetrieveBatchRequest,
_is_async: bool,
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
):
api_base: Optional[str] = None
if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS:
Expand Down Expand Up @@ -498,6 +500,27 @@ def _handle_retrieve_batch_providers_without_provider_config(
timeout=timeout,
max_retries=optional_params.max_retries,
)
elif custom_llm_provider == "anthropic":
api_base = (
optional_params.api_base
or litellm.api_base
or get_secret_str("ANTHROPIC_API_BASE")
)
api_key = (
optional_params.api_key
or litellm.api_key
or litellm.azure_key
or get_secret_str("ANTHROPIC_API_KEY")
)

response = anthropic_batches_instance.retrieve_batch(
_is_async=_is_async,
batch_id=batch_id,
api_base=api_base,
api_key=api_key,
timeout=timeout,
max_retries=optional_params.max_retries,
)
else:
raise litellm.exceptions.BadRequestError(
message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
Expand All @@ -517,7 +540,7 @@ def _handle_retrieve_batch_providers_without_provider_config(
@client
def retrieve_batch(
batch_id: str,
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
metadata: Optional[Dict[str, str]] = None,
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -608,7 +631,7 @@ def retrieve_batch(
api_key=optional_params.api_key,
logging_obj=litellm_logging_obj
or LiteLLMLoggingObj(
model=model or "bedrock/unknown",
model=model or f"{custom_llm_provider}/unknown",
messages=[],
stream=False,
call_type="batch_retrieve",
Expand Down
18 changes: 16 additions & 2 deletions litellm/files/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from litellm import get_secret_str
from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.anthropic.files.handler import AnthropicFilesHandler
from litellm.llms.azure.files.handler import AzureOpenAIFilesAPI
from litellm.llms.bedrock.files.handler import BedrockFilesHandler
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
Expand Down Expand Up @@ -49,6 +50,7 @@
azure_files_instance = AzureOpenAIFilesAPI()
vertex_ai_files_instance = VertexAIFilesHandler()
bedrock_files_instance = BedrockFilesHandler()
anthropic_files_instance = AnthropicFilesHandler()
#################################################


Expand Down Expand Up @@ -757,7 +759,7 @@ def file_list(
@client
async def afile_content(
file_id: str,
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
Expand Down Expand Up @@ -802,7 +804,7 @@ def file_content(
file_id: str,
model: Optional[str] = None,
custom_llm_provider: Optional[
Union[Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"], str]
Union[Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"], str]
] = None,
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -849,6 +851,18 @@ def file_content(

_is_async = kwargs.pop("afile_content", False) is True

# Check if this is an Anthropic batch results request
if custom_llm_provider == "anthropic":
response = anthropic_files_instance.file_content(
_is_async=_is_async,
file_content_request=_file_content_request,
api_base=optional_params.api_base,
api_key=optional_params.api_key,
timeout=timeout,
max_retries=optional_params.max_retries,
)
return response

if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS:
# for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
api_base = (
Expand Down
5 changes: 5 additions & 0 deletions litellm/llms/anthropic/batches/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .handler import AnthropicBatchesHandler
from .transformation import AnthropicBatchesConfig

__all__ = ["AnthropicBatchesHandler", "AnthropicBatchesConfig"]

168 changes: 168 additions & 0 deletions litellm/llms/anthropic/batches/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""
Anthropic Batches API Handler
"""

import asyncio
from typing import TYPE_CHECKING, Any, Coroutine, Optional, Union

import httpx

from litellm.llms.custom_httpx.http_handler import (
get_async_httpx_client,
)
from litellm.types.utils import LiteLLMBatch, LlmProviders

if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
else:
LiteLLMLoggingObj = Any

from ..common_utils import AnthropicModelInfo
from .transformation import AnthropicBatchesConfig


class AnthropicBatchesHandler:
"""
Handler for Anthropic Message Batches API.

Supports:
- retrieve_batch() - Retrieve batch status and information
"""

def __init__(self):
self.anthropic_model_info = AnthropicModelInfo()
self.provider_config = AnthropicBatchesConfig()

async def aretrieve_batch(
self,
batch_id: str,
api_base: Optional[str],
api_key: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
logging_obj: Optional[LiteLLMLoggingObj] = None,
) -> LiteLLMBatch:
"""
Async: Retrieve a batch from Anthropic.

Args:
batch_id: The batch ID to retrieve
api_base: Anthropic API base URL
api_key: Anthropic API key
timeout: Request timeout
max_retries: Max retry attempts (unused for now)
logging_obj: Optional logging object

Returns:
LiteLLMBatch: Batch information in OpenAI format
"""
# Resolve API credentials
api_base = api_base or self.anthropic_model_info.get_api_base(api_base)
api_key = api_key or self.anthropic_model_info.get_api_key()

if not api_key:
raise ValueError("Missing Anthropic API Key")

# Create a minimal logging object if not provided
if logging_obj is None:
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObjClass
logging_obj = LiteLLMLoggingObjClass(
model="anthropic/unknown",
messages=[],
stream=False,
call_type="batch_retrieve",
start_time=None,
litellm_call_id=f"batch_retrieve_{batch_id}",
function_id="batch_retrieve",
)

# Get the complete URL for batch retrieval
retrieve_url = self.provider_config.get_retrieve_batch_url(
api_base=api_base,
batch_id=batch_id,
optional_params={},
litellm_params={},
)

# Validate environment and get headers
headers = self.provider_config.validate_environment(
headers={},
model="",
messages=[],
optional_params={},
litellm_params={},
api_key=api_key,
api_base=api_base,
)

logging_obj.pre_call(
input=batch_id,
api_key=api_key,
additional_args={
"api_base": retrieve_url,
"headers": headers,
"complete_input_dict": {},
},
)
# Make the request
async_client = get_async_httpx_client(llm_provider=LlmProviders.ANTHROPIC)
response = await async_client.get(
url=retrieve_url,
headers=headers
)
response.raise_for_status()

# Transform response to LiteLLM format
return self.provider_config.transform_retrieve_batch_response(
model=None,
raw_response=response,
logging_obj=logging_obj,
litellm_params={},
)

def retrieve_batch(
self,
_is_async: bool,
batch_id: str,
api_base: Optional[str],
api_key: Optional[str],
timeout: Union[float, httpx.Timeout],
max_retries: Optional[int],
logging_obj: Optional[LiteLLMLoggingObj] = None,
) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]:
"""
Retrieve a batch from Anthropic.

Args:
_is_async: Whether to run asynchronously
batch_id: The batch ID to retrieve
api_base: Anthropic API base URL
api_key: Anthropic API key
timeout: Request timeout
max_retries: Max retry attempts (unused for now)
logging_obj: Optional logging object

Returns:
LiteLLMBatch or Coroutine: Batch information in OpenAI format
"""
if _is_async:
return self.aretrieve_batch(
batch_id=batch_id,
api_base=api_base,
api_key=api_key,
timeout=timeout,
max_retries=max_retries,
logging_obj=logging_obj,
)
else:
return asyncio.run(
self.aretrieve_batch(
batch_id=batch_id,
api_base=api_base,
api_key=api_key,
timeout=timeout,
max_retries=max_retries,
logging_obj=logging_obj,
)
)

Loading
Loading