From a139b1f9b20a75687a4f254cce2b8704cdc2671a Mon Sep 17 00:00:00 2001 From: zuczkows Date: Wed, 4 Jun 2025 10:45:55 +0200 Subject: [PATCH 1/4] Add Event object for synchronization and more debug logs for ws connection --- livechat/utils/ws_client.py | 161 ++++++++++++++++++++++++++++++++---- 1 file changed, 146 insertions(+), 15 deletions(-) diff --git a/livechat/utils/ws_client.py b/livechat/utils/ws_client.py index 13d0c90..4c1ce2b 100644 --- a/livechat/utils/ws_client.py +++ b/livechat/utils/ws_client.py @@ -7,6 +7,7 @@ import random import ssl import threading +import time from time import sleep from typing import List, NoReturn, Union @@ -31,7 +32,13 @@ def on_close(ws_client: WebSocketApp, close_status_code: int, close_msg: str): def on_error(ws_client: WebSocketApp, error: Exception): - logger.error(f'websocket error occurred: {str(error)}') + error_details = { + 'error_type': type(error).__name__, + 'error_message': str(error), + 'url': getattr(ws_client, 'url', 'unknown'), + 'keep_running': getattr(ws_client, 'keep_running', 'unknown') + } + logger.error(f'websocket error occurred: {error_details}') class WebsocketClient(WebSocketApp): @@ -67,6 +74,7 @@ def open(self, logger.warning( 'Cannot open new websocket connection, already connected.') return + self.response_timeout = response_timeout run_forever_kwargs = { 'sslopt': { @@ -76,12 +84,64 @@ def open(self, 'ping_timeout': ping_timeout, 'ping_interval': ping_interval, } + if keep_alive: - ping_thread = threading.Thread(target=self.run_forever, - kwargs=run_forever_kwargs) - ping_thread.start() - self._wait_till_sock_connected(ws_conn_timeout) + logger.debug(f'Starting WebSocket connection to {self.url}') + + # Use threading.Event for better synchronization + connection_event = threading.Event() + connection_error = threading.Event() + + # Store original callbacks + original_on_open = getattr(self, 'on_open', None) + original_on_error = getattr(self, 'on_error', None) + + # Create enhanced callbacks for connection tracking + def on_open_with_event(ws): + logger.info('WebSocket connection opened') + connection_event.set() + if original_on_open: + original_on_open(ws) + + def on_error_with_event(ws, error): + logger.error(f'WebSocket connection error: {error}') + connection_error.set() + if original_on_error: + original_on_error(ws, error) + + # Set enhanced callbacks + self.on_open = on_open_with_event + self.on_error = on_error_with_event + + try: + ping_thread = threading.Thread(target=self.run_forever, + kwargs=run_forever_kwargs) + ping_thread.daemon = True # Make thread daemon to prevent hanging + ping_thread.start() + + # Wait for either connection success or error + if connection_event.wait(timeout=ws_conn_timeout): + logger.debug('WebSocket connection established via event') + elif connection_error.is_set(): + raise TimeoutError( + 'WebSocket connection failed due to error') + else: + # Fallback to original polling method + logger.debug( + 'Event-based connection detection failed, falling back to polling' + ) + self._wait_till_sock_connected(ws_conn_timeout) + + except Exception as e: + logger.error(f'Failed to establish WebSocket connection: {e}') + raise + finally: + # Restore original callbacks + self.on_open = original_on_open + self.on_error = original_on_error + return + self.run_forever(**run_forever_kwargs) def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict: @@ -96,14 +156,24 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict: RtmResponse: RTM response structure (`request_id`, `action`, `type`, `success` and `payload` properties) ''' + # Validate connection before sending + if not self.is_connected(): + raise WebSocketConnectionClosedException( + 'Connection is already closed.') + request_id = str(random.randint(1, 9999999999)) request.update({'request_id': request_id}) request_json = json.dumps(request, indent=4) logger.info(f'\nREQUEST:\n{request_json}') - if not self.sock or self.sock.send(request_json, opcode) == 0: - raise WebSocketConnectionClosedException( - 'Connection is already closed.') + try: + send_result = self.sock.send(request_json, opcode) + if send_result == 0: + raise WebSocketConnectionClosedException( + 'Failed to send data - connection closed.') + except Exception as e: + logger.error(f'Failed to send WebSocket message: {e}') + raise WebSocketConnectionClosedException(f'Connection error: {e}') def await_message(stop_event: threading.Event) -> dict: while not stop_event.is_set(): @@ -130,16 +200,77 @@ def await_message(stop_event: threading.Event) -> dict: return RtmResponse(response) + def is_connected(self) -> bool: + ''' Check if WebSocket connection is active and healthy. ''' + try: + return (self.sock is not None and hasattr(self.sock, 'connected') + and self.sock.connected + and getattr(self, 'keep_running', False)) + except Exception: + return False + def _wait_till_sock_connected(self, timeout: Union[float, int] = 10) -> NoReturn: ''' Polls until `self.sock` is connected. Args: timeout (float): timeout value in seconds, default 10. ''' - if timeout < 0: - raise TimeoutError('Timed out waiting for WebSocket to open.') + start_time = time.time() + poll_interval = 0.05 + + logger.debug(f'Waiting for WebSocket connection (timeout: {timeout}s)') + + while time.time() - start_time < timeout: + try: + if self.sock and hasattr(self.sock, + 'connected') and self.sock.connected: + logger.debug( + 'WebSocket connection established successfully') + return + + # Check if socket exists but connection failed + if self.sock and hasattr( + self.sock, 'connected') and not self.sock.connected: + # Give it a bit more time for connection to establish + pass + + except AttributeError: + # Socket not yet created, continue waiting + pass + except Exception as e: + logger.warning( + f'Unexpected error while checking WebSocket connection: {e}' + ) + + sleep(poll_interval) + + connection_status = 'unknown' + if hasattr(self, 'sock') and self.sock: + if hasattr(self.sock, 'connected'): + connection_status = f'connected={self.sock.connected}' + else: + connection_status = 'sock exists but no connected attribute' + else: + connection_status = 'sock is None' + + error_msg = f'Timed out waiting for WebSocket to open after {timeout}s. Connection status: {connection_status}' + logger.error(error_msg) + raise TimeoutError(error_msg) + + def close(self, code: int = 1000, reason: str = 'Normal closure') -> None: + ''' Close WebSocket connection gracefully. ''' + logger.debug( + f'Closing WebSocket connection (code: {code}, reason: {reason})') try: - assert self.sock.connected - return - except (AttributeError, AssertionError): - sleep(0.1) - return self._wait_till_sock_connected(timeout=timeout - 0.1) + if self.sock and hasattr(self.sock, 'close'): + self.sock.close(code, reason) + self.keep_running = False + except Exception as e: + logger.warning(f'Error during WebSocket close: {e}') + + def __del__(self): + ''' Cleanup when object is destroyed. ''' + try: + if hasattr(self, 'sock') and self.sock: + self.close() + except Exception: + pass # Ignore errors during cleanup From b11154ad950120241b1530b5eb98673e8c2fed7d Mon Sep 17 00:00:00 2001 From: zuczkows Date: Wed, 4 Jun 2025 14:35:03 +0200 Subject: [PATCH 2/4] fixup! Add Event object for synchronization and more debug logs for ws connection --- livechat/utils/ws_client.py | 100 ++++++++---------------------------- 1 file changed, 22 insertions(+), 78 deletions(-) diff --git a/livechat/utils/ws_client.py b/livechat/utils/ws_client.py index 4c1ce2b..2b1cf0d 100644 --- a/livechat/utils/ws_client.py +++ b/livechat/utils/ws_client.py @@ -7,7 +7,6 @@ import random import ssl import threading -import time from time import sleep from typing import List, NoReturn, Union @@ -24,11 +23,11 @@ def on_message(ws_client: WebSocketApp, message: str): def on_close(ws_client: WebSocketApp, close_status_code: int, close_msg: str): - logger.info('websocket closed:') + logger.info('websocket closed') if close_status_code or close_msg: - logger.info('close status code: ' + str(close_status_code)) - logger.info('close message: ' + str(close_msg)) + logger.info(f'close status code: {close_status_code}') + logger.info(f'close message: {close_msg}') def on_error(ws_client: WebSocketApp, error: Exception): @@ -36,7 +35,6 @@ def on_error(ws_client: WebSocketApp, error: Exception): 'error_type': type(error).__name__, 'error_message': str(error), 'url': getattr(ws_client, 'url', 'unknown'), - 'keep_running': getattr(ws_client, 'keep_running', 'unknown') } logger.error(f'websocket error occurred: {error_details}') @@ -88,55 +86,57 @@ def open(self, if keep_alive: logger.debug(f'Starting WebSocket connection to {self.url}') - # Use threading.Event for better synchronization connection_event = threading.Event() connection_error = threading.Event() - # Store original callbacks + error_info = {'message': None} + original_on_open = getattr(self, 'on_open', None) original_on_error = getattr(self, 'on_error', None) - # Create enhanced callbacks for connection tracking def on_open_with_event(ws): - logger.info('WebSocket connection opened') connection_event.set() if original_on_open: original_on_open(ws) def on_error_with_event(ws, error): - logger.error(f'WebSocket connection error: {error}') + logger.error(f'WebSocket new message error occurred: {error}') + error_info['message'] = str(error) connection_error.set() if original_on_error: original_on_error(ws, error) - # Set enhanced callbacks self.on_open = on_open_with_event self.on_error = on_error_with_event try: ping_thread = threading.Thread(target=self.run_forever, kwargs=run_forever_kwargs) - ping_thread.daemon = True # Make thread daemon to prevent hanging + ping_thread.daemon = True ping_thread.start() - # Wait for either connection success or error if connection_event.wait(timeout=ws_conn_timeout): - logger.debug('WebSocket connection established via event') + logger.debug( + 'WebSocket connection established successfully') elif connection_error.is_set(): + error_msg = error_info[ + 'message'] or 'Unknown connection error' + logger.error(f'WebSocket connection failed: {error_msg}') raise TimeoutError( - 'WebSocket connection failed due to error') + f'WebSocket connection failed due to error: {error_msg}' + ) else: - # Fallback to original polling method - logger.debug( - 'Event-based connection detection failed, falling back to polling' + logger.error( + 'WebSocket connection timeout - no response within timeout period' + ) + raise TimeoutError( + f'WebSocket connection timeout after {ws_conn_timeout}s' ) - self._wait_till_sock_connected(ws_conn_timeout) except Exception as e: logger.error(f'Failed to establish WebSocket connection: {e}') raise finally: - # Restore original callbacks self.on_open = original_on_open self.on_error = original_on_error @@ -156,7 +156,6 @@ def send(self, request: dict, opcode=ABNF.OPCODE_TEXT) -> dict: RtmResponse: RTM response structure (`request_id`, `action`, `type`, `success` and `payload` properties) ''' - # Validate connection before sending if not self.is_connected(): raise WebSocketConnectionClosedException( 'Connection is already closed.') @@ -209,68 +208,13 @@ def is_connected(self) -> bool: except Exception: return False - def _wait_till_sock_connected(self, - timeout: Union[float, int] = 10) -> NoReturn: - ''' Polls until `self.sock` is connected. - Args: - timeout (float): timeout value in seconds, default 10. ''' - start_time = time.time() - poll_interval = 0.05 - - logger.debug(f'Waiting for WebSocket connection (timeout: {timeout}s)') - - while time.time() - start_time < timeout: - try: - if self.sock and hasattr(self.sock, - 'connected') and self.sock.connected: - logger.debug( - 'WebSocket connection established successfully') - return - - # Check if socket exists but connection failed - if self.sock and hasattr( - self.sock, 'connected') and not self.sock.connected: - # Give it a bit more time for connection to establish - pass - - except AttributeError: - # Socket not yet created, continue waiting - pass - except Exception as e: - logger.warning( - f'Unexpected error while checking WebSocket connection: {e}' - ) - - sleep(poll_interval) - - connection_status = 'unknown' - if hasattr(self, 'sock') and self.sock: - if hasattr(self.sock, 'connected'): - connection_status = f'connected={self.sock.connected}' - else: - connection_status = 'sock exists but no connected attribute' - else: - connection_status = 'sock is None' - - error_msg = f'Timed out waiting for WebSocket to open after {timeout}s. Connection status: {connection_status}' - logger.error(error_msg) - raise TimeoutError(error_msg) - def close(self, code: int = 1000, reason: str = 'Normal closure') -> None: ''' Close WebSocket connection gracefully. ''' - logger.debug( + logger.info( f'Closing WebSocket connection (code: {code}, reason: {reason})') try: - if self.sock and hasattr(self.sock, 'close'): + if self.sock: self.sock.close(code, reason) self.keep_running = False except Exception as e: logger.warning(f'Error during WebSocket close: {e}') - - def __del__(self): - ''' Cleanup when object is destroyed. ''' - try: - if hasattr(self, 'sock') and self.sock: - self.close() - except Exception: - pass # Ignore errors during cleanup From 8e9b2bc9a7713baca0e89656e8d40640da460cbb Mon Sep 17 00:00:00 2001 From: zuczkows Date: Thu, 5 Jun 2025 10:09:03 +0200 Subject: [PATCH 3/4] Add ws handshake debug log --- livechat/utils/ws_client.py | 72 +++++++++++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 6 deletions(-) diff --git a/livechat/utils/ws_client.py b/livechat/utils/ws_client.py index 2b1cf0d..9895061 100644 --- a/livechat/utils/ws_client.py +++ b/livechat/utils/ws_client.py @@ -36,7 +36,7 @@ def on_error(ws_client: WebSocketApp, error: Exception): 'error_message': str(error), 'url': getattr(ws_client, 'url', 'unknown'), } - logger.error(f'websocket error occurred: {error_details}') + logger.error(f'WebSocket error occurred: {error_details}') class WebsocketClient(WebSocketApp): @@ -84,10 +84,18 @@ def open(self, } if keep_alive: - logger.debug(f'Starting WebSocket connection to {self.url}') + logger.debug( + f'Starting WebSocket connection to:\n{self.url}\nwith header:\n{self.header}' + ) connection_event = threading.Event() connection_error = threading.Event() + handshake_info = { + 'status': None, + 'headers': None, + 'error': None, + 'url': self.url + } error_info = {'message': None} @@ -95,13 +103,47 @@ def open(self, original_on_error = getattr(self, 'on_error', None) def on_open_with_event(ws): + try: + if hasattr(ws.sock, 'handshake_response'): + handshake_info[ + 'status'] = ws.sock.handshake_response.status + handshake_info[ + 'headers'] = ws.sock.handshake_response.headers + logger.debug( + f'WebSocket handshake successful - Status: {handshake_info["status"]}' + ) + logger.debug( + f'WebSocket handshake headers: {handshake_info["headers"]}' + ) + else: + logger.debug( + 'WebSocket handshake completed but no response details available' + ) + except Exception as e: + logger.warning(f'Could not capture handshake details: {e}') + connection_event.set() if original_on_open: original_on_open(ws) def on_error_with_event(ws, error): - logger.error(f'WebSocket new message error occurred: {error}') - error_info['message'] = str(error) + error_type = type(error).__name__ + error_msg = str(error) + + handshake_info['status'] = ws.sock.handshake_response.status + handshake_info['headers'] = ws.sock.handshake_response.headers + + handshake_info['error'] = { + 'type': error_type, + 'message': error_msg, + 'during_handshake': True + } + + logger.error( + f'WebSocket error during connection: {error_type}: {error_msg}' + ) + + error_info['message'] = error_msg connection_error.set() if original_on_error: original_on_error(ws, error) @@ -117,11 +159,16 @@ def on_error_with_event(ws, error): if connection_event.wait(timeout=ws_conn_timeout): logger.debug( - 'WebSocket connection established successfully') + f'WebSocket connection established successfully.\nHandshake status: {handshake_info["status"]}' + ) elif connection_error.is_set(): error_msg = error_info[ 'message'] or 'Unknown connection error' logger.error(f'WebSocket connection failed: {error_msg}') + logger.error( + f'Handshake info:\n {json.dumps(handshake_info, indent=4)}' + ) + raise TimeoutError( f'WebSocket connection failed due to error: {error_msg}' ) @@ -129,8 +176,21 @@ def on_error_with_event(ws, error): logger.error( 'WebSocket connection timeout - no response within timeout period' ) + if self.sock and hasattr(self.sock, 'handshake_response'): + handshake_info[ + 'status'] = self.sock.handshake_response.status + handshake_info[ + 'headers'] = self.sock.handshake_response.headers + + logger.error( + f'Timeout details: {ws_conn_timeout}s waiting for connection to {handshake_info["url"]}' + ) + logger.error( + f'Handshake info:\n {json.dumps(handshake_info, indent=4)}' + ) + raise TimeoutError( - f'WebSocket connection timeout after {ws_conn_timeout}s' + f'WebSocket handshake timeout after {ws_conn_timeout}s - server did not respond to HTTP upgrade request' ) except Exception as e: From c8f042f2979eeaa4394fbb0798584a1af9f90ebd Mon Sep 17 00:00:00 2001 From: zuczkows Date: Thu, 5 Jun 2025 10:58:00 +0200 Subject: [PATCH 4/4] fixup! Add ws handshake debug log --- livechat/utils/ws_client.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/livechat/utils/ws_client.py b/livechat/utils/ws_client.py index 9895061..ea9e47b 100644 --- a/livechat/utils/ws_client.py +++ b/livechat/utils/ws_client.py @@ -130,8 +130,15 @@ def on_error_with_event(ws, error): error_type = type(error).__name__ error_msg = str(error) - handshake_info['status'] = ws.sock.handshake_response.status - handshake_info['headers'] = ws.sock.handshake_response.headers + try: + if hasattr(ws.sock, 'handshake_response' + ) and ws.sock.handshake_response: + handshake_info[ + 'status'] = ws.sock.handshake_response.status + handshake_info[ + 'headers'] = ws.sock.handshake_response.headers + except Exception: + pass handshake_info['error'] = { 'type': error_type, @@ -176,11 +183,18 @@ def on_error_with_event(ws, error): logger.error( 'WebSocket connection timeout - no response within timeout period' ) - if self.sock and hasattr(self.sock, 'handshake_response'): - handshake_info[ - 'status'] = self.sock.handshake_response.status - handshake_info[ - 'headers'] = self.sock.handshake_response.headers + if self.sock: + if hasattr(self.sock, 'handshake_response'): + handshake_info[ + 'status'] = self.sock.handshake_response.status + handshake_info[ + 'headers'] = self.sock.handshake_response.headers + else: + handshake_info['status'] = 'unknown' + handshake_info['headers'] = 'unknown' + else: + handshake_info['status'] = 'no socket' + handshake_info['headers'] = 'no socket' logger.error( f'Timeout details: {ws_conn_timeout}s waiting for connection to {handshake_info["url"]}'