diff --git a/src/anthropic/_streaming.py b/src/anthropic/_streaming.py index d5fd14c5..908cdaac 100644 --- a/src/anthropic/_streaming.py +++ b/src/anthropic/_streaming.py @@ -81,20 +81,6 @@ def __stream__(self) -> Iterator[_T]: if sse.event == "completion": yield process_data(data=sse.json(), cast_to=cast_to, response=response) - if ( - sse.event == "message_start" - or sse.event == "message_delta" - or sse.event == "message_stop" - or sse.event == "content_block_start" - or sse.event == "content_block_delta" - or sse.event == "content_block_stop" - ): - data = sse.json() - if is_dict(data) and "type" not in data: - data["type"] = sse.event - - yield process_data(data=data, cast_to=cast_to, response=response) - if sse.event == "ping": continue @@ -113,6 +99,13 @@ def __stream__(self) -> Iterator[_T]: response=self.response, ) + # Process any other event for forward compatibility + data = sse.json() + if is_dict(data) and "type" not in data: + data["type"] = sse.event + + yield process_data(data=data, cast_to=cast_to, response=response) + # As we might not fully consume the response stream, we need to close it explicitly response.close() @@ -198,20 +191,6 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.event == "completion": yield process_data(data=sse.json(), cast_to=cast_to, response=response) - if ( - sse.event == "message_start" - or sse.event == "message_delta" - or sse.event == "message_stop" - or sse.event == "content_block_start" - or sse.event == "content_block_delta" - or sse.event == "content_block_stop" - ): - data = sse.json() - if is_dict(data) and "type" not in data: - data["type"] = sse.event - - yield process_data(data=data, cast_to=cast_to, response=response) - if sse.event == "ping": continue @@ -230,6 +209,13 @@ async def __stream__(self) -> AsyncIterator[_T]: response=self.response, ) + # Process any other event for forward compatibility + data = sse.json() + if is_dict(data) and "type" not in data: + data["type"] = sse.event + + yield process_data(data=data, cast_to=cast_to, response=response) + # As we might not fully consume the response stream, we need to close it explicitly await response.aclose()