From 78d4f87d30907575eb77fa5b0754f13f3e3fb2ed Mon Sep 17 00:00:00 2001 From: dolfies Date: Sun, 22 Jun 2025 21:14:22 -0400 Subject: [PATCH 1/4] Fix websocket close code __contains__ check (#586) --- curl_cffi/requests/websockets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/curl_cffi/requests/websockets.py b/curl_cffi/requests/websockets.py index 28373d24..5cac73a0 100644 --- a/curl_cffi/requests/websockets.py +++ b/curl_cffi/requests/websockets.py @@ -139,7 +139,7 @@ def _unpack_close_frame(frame: bytes) -> tuple[int, str]: "Invalid close frame", WsCloseCode.PROTOCOL_ERROR ) from e else: - if code < 3000 and (code not in WsCloseCode or code == 1005): + if code < 3000 and (code not in WsCloseCode._value2member_map_ or code == 1005): raise WebSocketError( "Invalid close code", WsCloseCode.PROTOCOL_ERROR ) From b5efdc93ee6148e7138d89a12f2a049e85573cf1 Mon Sep 17 00:00:00 2001 From: dolfies Date: Mon, 23 Jun 2025 17:56:52 -0400 Subject: [PATCH 2/4] Fix a number of errors when closing an actively used async websocket --- curl_cffi/_asyncio_selector.py | 4 +- curl_cffi/requests/websockets.py | 75 +++++++++++++++++++------------- 2 files changed, 47 insertions(+), 32 deletions(-) diff --git a/curl_cffi/_asyncio_selector.py b/curl_cffi/_asyncio_selector.py index 13b06a47..df180939 100644 --- a/curl_cffi/_asyncio_selector.py +++ b/curl_cffi/_asyncio_selector.py @@ -209,8 +209,8 @@ def _run_select(self) -> None: rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0) if rs: ws = [] - else: - raise + # If we're here, the socket was probably closed + # Do not re-raise else: raise diff --git a/curl_cffi/requests/websockets.py b/curl_cffi/requests/websockets.py index 5cac73a0..c328dcbf 100644 --- a/curl_cffi/requests/websockets.py +++ b/curl_cffi/requests/websockets.py @@ -567,12 +567,13 @@ async def recv_fragment( Args: timeout: how many seconds to wait before giving up. """ - if self.closed: - raise WebSocketClosed("WebSocket is closed") if self._recv_lock.locked(): raise TypeError("Concurrent call to recv_fragment() is not allowed") async with self._recv_lock: + if self.closed: + raise WebSocketClosed("WebSocket is closed") + try: chunk, frame = await asyncio.wait_for( self.loop.run_in_executor(None, self.curl.ws_recv), timeout @@ -581,9 +582,10 @@ async def recv_fragment( raise WebSocketTimeout("WebSocket recv_fragment() timed out") from e if frame.flags & CurlWsFlag.CLOSE: try: - code, message = self._close_code, self._close_reason = ( - self._unpack_close_frame(chunk) - ) + code, message = ( + self._close_code, + self._close_reason, + ) = self._unpack_close_frame(chunk) except WebSocketError as e: # Follow the spec to close the connection # Errors do not respect autoclose @@ -606,30 +608,43 @@ async def recv(self, *, timeout: Optional[float] = None) -> tuple[bytes, int]: timeout: how many seconds to wait before giving up. """ loop = self.loop - chunks = [] - flags = 0 - sock_fd = await loop.run_in_executor( - None, self.curl.getinfo, CurlInfo.ACTIVESOCKET - ) - if sock_fd == CURL_SOCKET_BAD: - raise WebSocketError( - "Invalid active socket", CurlECode.NO_CONNECTION_AVAILABLE + if self.closed: + raise WebSocketClosed("WebSocket is closed") + + async def _inner_recv() -> tuple[bytes, int]: + chunks = [] + flags = 0 + + sock_fd = await loop.run_in_executor( + None, self.curl.getinfo, CurlInfo.ACTIVESOCKET ) - while True: - try: - chunk, frame = await self.recv_fragment(timeout=timeout) - flags = frame.flags - chunks.append(chunk) - if frame.bytesleft == 0 and flags & CurlWsFlag.CONT == 0: - break - except CurlError as e: - if e.code == CurlECode.AGAIN: - await aselect(sock_fd, loop=loop, timeout=timeout) - else: - raise + if sock_fd == CURL_SOCKET_BAD: + raise WebSocketError( + "Invalid active socket", CurlECode.NO_CONNECTION_AVAILABLE + ) - return b"".join(chunks), flags + while True: + try: + chunk, frame = await self.recv_fragment(timeout=timeout) + flags = frame.flags + chunks.append(chunk) + if frame.bytesleft == 0 and flags & CurlWsFlag.CONT == 0: + break + except CurlError as e: + if e.code == CurlECode.AGAIN: + # We don't use the timeout here because it deadlocks if + # the socket is closed while recv() is waiting + await aselect(sock_fd, loop=loop, timeout=0.5) + else: + raise + + return b"".join(chunks), flags + + if timeout: + return await asyncio.wait_for(_inner_recv(), timeout=timeout) + else: + return await _inner_recv() async def recv_str(self, *, timeout: Optional[float] = None) -> str: """Receive a text frame. @@ -663,15 +678,15 @@ async def send( payload: data to send. flags: flags for the frame. """ - if self.closed: - raise WebSocketClosed("WebSocket is closed") - # curl expects bytes if isinstance(payload, str): payload = payload.encode() - # TODO: Why does concurrently sending fail async with self._send_lock: + # We must check the closed state after the last asyncio tick (i.e. the above async with call) + # as a race condition arises where the websocket is not yet closed until we're inside here + if self.closed: + raise WebSocketClosed("WebSocket is closed") return await self.loop.run_in_executor( None, self.curl.ws_send, payload, flags ) From f5ec1e90038e4c581ef0d4c9448cb896de174e7b Mon Sep 17 00:00:00 2001 From: dolfies Date: Mon, 23 Jun 2025 18:04:23 -0400 Subject: [PATCH 3/4] 2 more instances --- curl_cffi/requests/websockets.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/curl_cffi/requests/websockets.py b/curl_cffi/requests/websockets.py index c328dcbf..39d5bab7 100644 --- a/curl_cffi/requests/websockets.py +++ b/curl_cffi/requests/websockets.py @@ -139,7 +139,9 @@ def _unpack_close_frame(frame: bytes) -> tuple[int, str]: "Invalid close frame", WsCloseCode.PROTOCOL_ERROR ) from e else: - if code < 3000 and (code not in WsCloseCode._value2member_map_ or code == 1005): + if code < 3000 and ( + code not in WsCloseCode._value2member_map_ or code == 1005 + ): raise WebSocketError( "Invalid close code", WsCloseCode.PROTOCOL_ERROR ) @@ -571,6 +573,8 @@ async def recv_fragment( raise TypeError("Concurrent call to recv_fragment() is not allowed") async with self._recv_lock: + # We must check the closed state after the last asyncio tick (i.e. the above async with call) + # as a race condition arises where the websocket is not yet closed until we're inside here if self.closed: raise WebSocketClosed("WebSocket is closed") @@ -609,13 +613,15 @@ async def recv(self, *, timeout: Optional[float] = None) -> tuple[bytes, int]: """ loop = self.loop - if self.closed: - raise WebSocketClosed("WebSocket is closed") - async def _inner_recv() -> tuple[bytes, int]: chunks = [] flags = 0 + # We must check the closed state after the last asyncio tick (i.e. the above async with call) + # as a race condition arises where the websocket is not yet closed until we're inside here + if self.closed: + raise WebSocketClosed("WebSocket is closed") + sock_fd = await loop.run_in_executor( None, self.curl.getinfo, CurlInfo.ACTIVESOCKET ) From 8fb61dde5e61bba25fb0da4aa028fc8b0ee2a0ac Mon Sep 17 00:00:00 2001 From: dolfies Date: Mon, 23 Jun 2025 18:17:59 -0400 Subject: [PATCH 4/4] Remove unrelated formatting changes --- curl_cffi/requests/websockets.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/curl_cffi/requests/websockets.py b/curl_cffi/requests/websockets.py index 39d5bab7..645017a1 100644 --- a/curl_cffi/requests/websockets.py +++ b/curl_cffi/requests/websockets.py @@ -139,9 +139,7 @@ def _unpack_close_frame(frame: bytes) -> tuple[int, str]: "Invalid close frame", WsCloseCode.PROTOCOL_ERROR ) from e else: - if code < 3000 and ( - code not in WsCloseCode._value2member_map_ or code == 1005 - ): + if code < 3000 and (code not in WsCloseCode._value2member_map_ or code == 1005): raise WebSocketError( "Invalid close code", WsCloseCode.PROTOCOL_ERROR ) @@ -586,10 +584,9 @@ async def recv_fragment( raise WebSocketTimeout("WebSocket recv_fragment() timed out") from e if frame.flags & CurlWsFlag.CLOSE: try: - code, message = ( - self._close_code, - self._close_reason, - ) = self._unpack_close_frame(chunk) + code, message = self._close_code, self._close_reason = ( + self._unpack_close_frame(chunk) + ) except WebSocketError as e: # Follow the spec to close the connection # Errors do not respect autoclose