From 9816dd8c2c2011887e6cce6e43d2d3aa4c44e22b Mon Sep 17 00:00:00 2001 From: zariiii9003 <52598363+zariiii9003@users.noreply.github.com> Date: Wed, 18 Jun 2025 16:56:55 +0200 Subject: [PATCH] improve can.io type annotations --- .github/workflows/ci.yml | 2 +- can/_entry_points.py | 2 +- can/io/asc.py | 4 - can/io/blf.py | 46 +++---- can/io/canutils.py | 11 +- can/io/csv.py | 7 +- can/io/generic.py | 273 +++++++++++++++++++++++++++++---------- can/io/logger.py | 59 +++++---- can/io/player.py | 29 ++--- can/io/printer.py | 27 ++-- can/io/sqlite.py | 49 +++---- can/io/trc.py | 22 ++-- can/listener.py | 4 +- can/typechecking.py | 30 ++--- can/util.py | 4 +- doc/conf.py | 7 + doc/file_io.rst | 2 +- doc/internal-api.rst | 10 +- doc/notifier.rst | 3 +- test/logformats_test.py | 14 +- 20 files changed, 369 insertions(+), 236 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 588d9a96b..ec5c1bbac 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,7 +76,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: "3.13" - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/can/_entry_points.py b/can/_entry_points.py index e8ce92d7c..6320b797b 100644 --- a/can/_entry_points.py +++ b/can/_entry_points.py @@ -30,5 +30,5 @@ def read_entry_points(group: str) -> list[_EntryPoint]: def read_entry_points(group: str) -> list[_EntryPoint]: return [ _EntryPoint(ep.name, *ep.value.split(":", maxsplit=1)) - for ep in entry_points().get(group, []) + for ep in entry_points().get(group, []) # pylint: disable=no-member ] diff --git a/can/io/asc.py b/can/io/asc.py index 0bea823fd..e917953ff 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -39,8 +39,6 @@ class ASCReader(TextIOMessageReader): bus statistics, J1939 Transport Protocol messages) is ignored. """ - file: TextIO - def __init__( self, file: Union[StringPathLike, TextIO], @@ -322,8 +320,6 @@ class ASCWriter(TextIOMessageWriter): It the first message does not have a timestamp, it is set to zero. """ - file: TextIO - FORMAT_MESSAGE = "{channel} {id:<15} {dir:<4} {dtype} {data}" FORMAT_MESSAGE_FD = " ".join( [ diff --git a/can/io/blf.py b/can/io/blf.py index 6a1231fcc..2c9050d54 100644 --- a/can/io/blf.py +++ b/can/io/blf.py @@ -17,14 +17,14 @@ import struct import time import zlib -from collections.abc import Generator +from collections.abc import Generator, Iterator from decimal import Decimal from typing import Any, BinaryIO, Optional, Union, cast from ..message import Message from ..typechecking import StringPathLike from ..util import channel2int, dlc2len, len2dlc -from .generic import BinaryIOMessageReader, FileIOMessageWriter +from .generic import BinaryIOMessageReader, BinaryIOMessageWriter TSystemTime = tuple[int, int, int, int, int, int, int, int] @@ -104,7 +104,7 @@ class BLFParseError(Exception): TIME_ONE_NANS_FACTOR = Decimal("1e-9") -def timestamp_to_systemtime(timestamp: float) -> TSystemTime: +def timestamp_to_systemtime(timestamp: Optional[float]) -> TSystemTime: if timestamp is None or timestamp < 631152000: # Probably not a Unix timestamp return 0, 0, 0, 0, 0, 0, 0, 0 @@ -146,8 +146,6 @@ class BLFReader(BinaryIOMessageReader): silently ignored. """ - file: BinaryIO - def __init__( self, file: Union[StringPathLike, BinaryIO], @@ -206,7 +204,7 @@ def __iter__(self) -> Generator[Message, None, None]: yield from self._parse_container(data) self.stop() - def _parse_container(self, data): + def _parse_container(self, data: bytes) -> Iterator[Message]: if self._tail: data = b"".join((self._tail, data)) try: @@ -217,7 +215,7 @@ def _parse_container(self, data): # Save the remaining data that could not be processed self._tail = data[self._pos :] - def _parse_data(self, data): + def _parse_data(self, data: bytes) -> Iterator[Message]: """Optimized inner loop by making local copies of global variables and class members and hardcoding some values.""" unpack_obj_header_base = OBJ_HEADER_BASE_STRUCT.unpack_from @@ -375,13 +373,11 @@ def _parse_data(self, data): pos = next_pos -class BLFWriter(FileIOMessageWriter): +class BLFWriter(BinaryIOMessageWriter): """ Logs CAN data to a Binary Logging File compatible with Vector's tools. """ - file: BinaryIO - #: Max log container size of uncompressed data max_container_size = 128 * 1024 @@ -412,14 +408,12 @@ def __init__( Z_DEFAULT_COMPRESSION represents a default compromise between speed and compression (currently equivalent to level 6). """ - mode = "rb+" if append else "wb" try: - super().__init__(file, mode=mode) + super().__init__(file, mode="rb+" if append else "wb") except FileNotFoundError: # Trying to append to a non-existing file, create a new one append = False - mode = "wb" - super().__init__(file, mode=mode) + super().__init__(file, mode="wb") assert self.file is not None self.channel = channel self.compression_level = compression_level @@ -452,7 +446,7 @@ def __init__( # Write a default header which will be updated when stopped self._write_header(FILE_HEADER_SIZE) - def _write_header(self, filesize): + def _write_header(self, filesize: int) -> None: header = [b"LOGG", FILE_HEADER_SIZE, self.application_id, 0, 0, 0, 2, 6, 8, 1] # The meaning of "count of objects read" is unknown header.extend([filesize, self.uncompressed_size, self.object_count, 0]) @@ -462,7 +456,7 @@ def _write_header(self, filesize): # Pad to header size self.file.write(b"\x00" * (FILE_HEADER_SIZE - FILE_HEADER_STRUCT.size)) - def on_message_received(self, msg): + def on_message_received(self, msg: Message) -> None: channel = channel2int(msg.channel) if channel is None: channel = self.channel @@ -514,7 +508,7 @@ def on_message_received(self, msg): data = CAN_MSG_STRUCT.pack(channel, flags, msg.dlc, arb_id, can_data) self._add_object(CAN_MESSAGE, data, msg.timestamp) - def log_event(self, text, timestamp=None): + def log_event(self, text: str, timestamp: Optional[float] = None) -> None: """Add an arbitrary message to the log file as a global marker. :param str text: @@ -525,17 +519,19 @@ def log_event(self, text, timestamp=None): """ try: # Only works on Windows - text = text.encode("mbcs") + encoded = text.encode("mbcs") except LookupError: - text = text.encode("ascii") + encoded = text.encode("ascii") comment = b"Added by python-can" marker = b"python-can" data = GLOBAL_MARKER_STRUCT.pack( - 0, 0xFFFFFF, 0xFF3300, 0, len(text), len(marker), len(comment) + 0, 0xFFFFFF, 0xFF3300, 0, len(encoded), len(marker), len(comment) ) - self._add_object(GLOBAL_MARKER, data + text + marker + comment, timestamp) + self._add_object(GLOBAL_MARKER, data + encoded + marker + comment, timestamp) - def _add_object(self, obj_type, data, timestamp=None): + def _add_object( + self, obj_type: int, data: bytes, timestamp: Optional[float] = None + ) -> None: if timestamp is None: timestamp = self.stop_timestamp or time.time() if self.start_timestamp is None: @@ -564,7 +560,7 @@ def _add_object(self, obj_type, data, timestamp=None): if self._buffer_size >= self.max_container_size: self._flush() - def _flush(self): + def _flush(self) -> None: """Compresses and writes data in the buffer to file.""" if self.file.closed: return @@ -578,7 +574,7 @@ def _flush(self): self._buffer = [tail] self._buffer_size = len(tail) if not self.compression_level: - data = uncompressed_data + data: "Union[bytes, memoryview[int]]" = uncompressed_data # noqa: UP037 method = NO_COMPRESSION else: data = zlib.compress(uncompressed_data, self.compression_level) @@ -601,7 +597,7 @@ def file_size(self) -> int: """Return an estimate of the current file size in bytes.""" return self.file.tell() + self._buffer_size - def stop(self): + def stop(self) -> None: """Stops logging and closes the file.""" self._flush() if self.file.seekable(): diff --git a/can/io/canutils.py b/can/io/canutils.py index e83c21926..78d081637 100644 --- a/can/io/canutils.py +++ b/can/io/canutils.py @@ -6,7 +6,7 @@ import logging from collections.abc import Generator -from typing import Any, TextIO, Union +from typing import Any, Optional, TextIO, Union from can.message import Message @@ -34,8 +34,6 @@ class CanutilsLogReader(TextIOMessageReader): ``(0.0) vcan0 001#8d00100100820100`` """ - file: TextIO - def __init__( self, file: Union[StringPathLike, TextIO], @@ -148,13 +146,12 @@ def __init__( :param bool append: if set to `True` messages are appended to the file, else the file is truncated """ - mode = "a" if append else "w" - super().__init__(file, mode=mode) + super().__init__(file, mode="a" if append else "w") self.channel = channel - self.last_timestamp = None + self.last_timestamp: Optional[float] = None - def on_message_received(self, msg): + def on_message_received(self, msg: Message) -> None: # this is the case for the very first message: if self.last_timestamp is None: self.last_timestamp = msg.timestamp or 0.0 diff --git a/can/io/csv.py b/can/io/csv.py index dcc7996f7..865ef9af0 100644 --- a/can/io/csv.py +++ b/can/io/csv.py @@ -28,8 +28,6 @@ class CSVReader(TextIOMessageReader): Any line separator is accepted. """ - file: TextIO - def __init__( self, file: Union[StringPathLike, TextIO], @@ -89,8 +87,6 @@ class CSVWriter(TextIOMessageWriter): Each line is terminated with a platform specific line separator. """ - file: TextIO - def __init__( self, file: Union[StringPathLike, TextIO], @@ -106,8 +102,7 @@ def __init__( the file is truncated and starts with a newly written header line """ - mode = "a" if append else "w" - super().__init__(file, mode=mode) + super().__init__(file, mode="a" if append else "w") # Write a header row if not append: diff --git a/can/io/generic.py b/can/io/generic.py index 82523c3cd..21fc3e8e8 100644 --- a/can/io/generic.py +++ b/can/io/generic.py @@ -1,129 +1,260 @@ -"""Contains generic base classes for file IO.""" +"""This module provides abstract base classes for CAN message reading and writing operations +to various file formats. + +.. note:: + All classes in this module are abstract and should be subclassed to implement + specific file format handling. +""" -import gzip import locale -from abc import ABCMeta +import os +from abc import ABC, abstractmethod from collections.abc import Iterable from contextlib import AbstractContextManager +from io import BufferedIOBase, TextIOWrapper +from pathlib import Path from types import TracebackType from typing import ( + TYPE_CHECKING, Any, BinaryIO, + Generic, Literal, Optional, TextIO, + TypeVar, Union, - cast, ) from typing_extensions import Self -from .. import typechecking from ..listener import Listener from ..message import Message +from ..typechecking import FileLike, StringPathLike +if TYPE_CHECKING: + from _typeshed import ( + OpenBinaryModeReading, + OpenBinaryModeUpdating, + OpenBinaryModeWriting, + OpenTextModeReading, + OpenTextModeUpdating, + OpenTextModeWriting, + ) -class BaseIOHandler(AbstractContextManager): - """A generic file handler that can be used for reading and writing. - Can be used as a context manager. +#: type parameter used in generic classes :class:`MessageReader` and :class:`MessageWriter` +_IoTypeVar = TypeVar("_IoTypeVar", bound=FileLike) - :attr file: - the file-like object that is kept internally, or `None` if none - was opened - """ - file: Optional[typechecking.FileLike] +class MessageWriter(AbstractContextManager["MessageWriter"], Listener, ABC): + """Abstract base class for all CAN message writers. - def __init__( - self, - file: Optional[typechecking.AcceptedIOType], - mode: str = "rt", - **kwargs: Any, - ) -> None: - """ - :param file: a path-like object to open a file, a file-like object - to be used as a file or `None` to not use a file at all - :param mode: the mode that should be used to open the file, see - :func:`open`, ignored if *file* is `None` - """ - if file is None or (hasattr(file, "read") and hasattr(file, "write")): - # file is None or some file-like object - self.file = cast("Optional[typechecking.FileLike]", file) - else: - encoding: Optional[str] = ( - None - if "b" in mode - else kwargs.get("encoding", locale.getpreferredencoding(False)) - ) - # pylint: disable=consider-using-with - # file is some path-like object - self.file = cast( - "typechecking.FileLike", open(file, mode, encoding=encoding) - ) + This class serves as a foundation for implementing different message writer formats. + It combines context manager capabilities with the message listener interface. - # for multiple inheritance - super().__init__() + :param file: Path-like object or string representing the output file location + :param kwargs: Additional keyword arguments for specific writer implementations + """ + + @abstractmethod + def __init__(self, file: StringPathLike, **kwargs: Any) -> None: + pass + + @abstractmethod + def stop(self) -> None: + """Stop handling messages and cleanup any resources.""" def __enter__(self) -> Self: + """Enter the context manager.""" return self def __exit__( self, exc_type: Optional[type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], ) -> Literal[False]: + """Exit the context manager and ensure proper cleanup.""" self.stop() return False + +class SizedMessageWriter(MessageWriter, ABC): + """Abstract base class for message writers that can report their file size. + + This class extends :class:`MessageWriter` with the ability to determine the size + of the output file. + """ + + @abstractmethod + def file_size(self) -> int: + """Get the current size of the output file in bytes. + + :return: The size of the file in bytes + :rtype: int + """ + + +class FileIOMessageWriter(SizedMessageWriter, Generic[_IoTypeVar]): + """Base class for writers that operate on file descriptors. + + This class provides common functionality for writers that work with file objects. + + :param file: A path-like object or file object to write to + :param kwargs: Additional keyword arguments for specific writer implementations + + :ivar file: The file object being written to + """ + + file: _IoTypeVar + + @abstractmethod + def __init__(self, file: Union[StringPathLike, _IoTypeVar], **kwargs: Any) -> None: + pass + def stop(self) -> None: - """Closes the underlying file-like object and flushes it, if it was opened in write mode.""" - if self.file is not None: - # this also implies a flush() - self.file.close() + """Close the file and stop writing.""" + self.file.close() + def file_size(self) -> int: + """Get the current file size.""" + return self.file.tell() -class MessageWriter(BaseIOHandler, Listener, metaclass=ABCMeta): - """The base class for all writers.""" - file: Optional[typechecking.FileLike] +class TextIOMessageWriter(FileIOMessageWriter[Union[TextIO, TextIOWrapper]], ABC): + """Text-based message writer implementation. + :param file: Text file to write to + :param mode: File open mode for text operations + :param kwargs: Additional arguments like encoding + """ + + def __init__( + self, + file: Union[StringPathLike, TextIO, TextIOWrapper], + mode: "Union[OpenTextModeUpdating, OpenTextModeWriting]" = "w", + **kwargs: Any, + ) -> None: + if isinstance(file, (str, os.PathLike)): + encoding: str = kwargs.get("encoding", locale.getpreferredencoding(False)) + # pylint: disable=consider-using-with + self.file = Path(file).open(mode=mode, encoding=encoding) + else: + self.file = file -class FileIOMessageWriter(MessageWriter, metaclass=ABCMeta): - """A specialized base class for all writers with file descriptors.""" - file: typechecking.FileLike +class BinaryIOMessageWriter(FileIOMessageWriter[Union[BinaryIO, BufferedIOBase]], ABC): + """Binary file message writer implementation. + + :param file: Binary file to write to + :param mode: File open mode for binary operations + :param kwargs: Additional implementation specific arguments + """ def __init__( - self, file: typechecking.AcceptedIOType, mode: str = "wt", **kwargs: Any + self, + file: Union[StringPathLike, BinaryIO, BufferedIOBase], + mode: "Union[OpenBinaryModeUpdating, OpenBinaryModeWriting]" = "wb", + **kwargs: Any, ) -> None: - # Not possible with the type signature, but be verbose for user-friendliness - if file is None: - raise ValueError("The given file cannot be None") + if isinstance(file, (str, os.PathLike)): + # pylint: disable=consider-using-with,unspecified-encoding + self.file = Path(file).open(mode=mode) + else: + self.file = file - super().__init__(file, mode, **kwargs) - def file_size(self) -> int: - """Return an estimate of the current file size in bytes.""" - return self.file.tell() +class MessageReader(AbstractContextManager["MessageReader"], Iterable[Message], ABC): + """Abstract base class for all CAN message readers. + + This class serves as a foundation for implementing different message reader formats. + It combines context manager capabilities with iteration interface. + + :param file: Path-like object or string representing the input file location + :param kwargs: Additional keyword arguments for specific reader implementations + """ + + @abstractmethod + def __init__(self, file: StringPathLike, **kwargs: Any) -> None: + pass + + @abstractmethod + def stop(self) -> None: + """Stop reading messages and cleanup any resources.""" + + def __enter__(self) -> Self: + return self + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> Literal[False]: + self.stop() + return False -class TextIOMessageWriter(FileIOMessageWriter, metaclass=ABCMeta): - file: TextIO +class FileIOMessageReader(MessageReader, Generic[_IoTypeVar]): + """Base class for readers that operate on file descriptors. + This class provides common functionality for readers that work with file objects. -class BinaryIOMessageWriter(FileIOMessageWriter, metaclass=ABCMeta): - file: Union[BinaryIO, gzip.GzipFile] + :param file: A path-like object or file object to read from + :param kwargs: Additional keyword arguments for specific reader implementations + :ivar file: The file object being read from + """ + + file: _IoTypeVar + + @abstractmethod + def __init__(self, file: Union[StringPathLike, _IoTypeVar], **kwargs: Any) -> None: + pass -class MessageReader(BaseIOHandler, Iterable[Message], metaclass=ABCMeta): - """The base class for all readers.""" + def stop(self) -> None: + self.file.close() -class TextIOMessageReader(MessageReader, metaclass=ABCMeta): - file: TextIO +class TextIOMessageReader(FileIOMessageReader[Union[TextIO, TextIOWrapper]], ABC): + """Text-based message reader implementation. + :param file: Text file to read from + :param mode: File open mode for text operations + :param kwargs: Additional arguments like encoding + """ -class BinaryIOMessageReader(MessageReader, metaclass=ABCMeta): - file: Union[BinaryIO, gzip.GzipFile] + def __init__( + self, + file: Union[StringPathLike, TextIO, TextIOWrapper], + mode: "OpenTextModeReading" = "r", + **kwargs: Any, + ) -> None: + if isinstance(file, (str, os.PathLike)): + encoding: str = kwargs.get("encoding", locale.getpreferredencoding(False)) + # pylint: disable=consider-using-with + self.file = Path(file).open(mode=mode, encoding=encoding) + else: + self.file = file + + +class BinaryIOMessageReader(FileIOMessageReader[Union[BinaryIO, BufferedIOBase]], ABC): + """Binary file message reader implementation. + + :param file: Binary file to read from + :param mode: File open mode for binary operations + :param kwargs: Additional implementation specific arguments + """ + + def __init__( + self, + file: Union[StringPathLike, BinaryIO, BufferedIOBase], + mode: "OpenBinaryModeReading" = "rb", + **kwargs: Any, + ) -> None: + if isinstance(file, (str, os.PathLike)): + # pylint: disable=consider-using-with,unspecified-encoding + self.file = Path(file).open(mode=mode) + else: + self.file = file diff --git a/can/io/logger.py b/can/io/logger.py index f9f029759..9febfe680 100644 --- a/can/io/logger.py +++ b/can/io/logger.py @@ -15,14 +15,13 @@ Final, Literal, Optional, - cast, ) from typing_extensions import Self from .._entry_points import read_entry_points from ..message import Message -from ..typechecking import AcceptedIOType, FileLike, StringPathLike +from ..typechecking import StringPathLike from .asc import ASCWriter from .blf import BLFWriter from .canutils import CanutilsLogWriter @@ -31,6 +30,8 @@ BinaryIOMessageWriter, FileIOMessageWriter, MessageWriter, + SizedMessageWriter, + TextIOMessageWriter, ) from .mf4 import MF4Writer from .printer import Printer @@ -71,9 +72,7 @@ def _get_logger_for_suffix(suffix: str) -> type[MessageWriter]: ) from None -def _compress( - filename: StringPathLike, **kwargs: Any -) -> tuple[type[MessageWriter], FileLike]: +def _compress(filename: StringPathLike, **kwargs: Any) -> FileIOMessageWriter[Any]: """ Return the suffix and io object of the decompressed file. File will automatically recompress upon close. @@ -93,11 +92,18 @@ def _compress( append = kwargs.get("append", False) if issubclass(logger_type, BinaryIOMessageWriter): - mode = "ab" if append else "wb" - else: - mode = "at" if append else "wt" + return logger_type( + file=gzip.open(filename=filename, mode="ab" if append else "wb"), **kwargs + ) + + elif issubclass(logger_type, TextIOMessageWriter): + return logger_type( + file=gzip.open(filename=filename, mode="at" if append else "wt"), **kwargs + ) - return logger_type, gzip.open(filename, mode) + raise ValueError( + f"The file type {real_suffix} is currently incompatible with gzip." + ) def Logger( # noqa: N802 @@ -143,12 +149,11 @@ def Logger( # noqa: N802 _update_writer_plugins() suffix = pathlib.PurePath(filename).suffix.lower() - file_or_filename: AcceptedIOType = filename if suffix == ".gz": - logger_type, file_or_filename = _compress(filename, **kwargs) - else: - logger_type = _get_logger_for_suffix(suffix) - return logger_type(file=file_or_filename, **kwargs) + return _compress(filename, **kwargs) + + logger_type = _get_logger_for_suffix(suffix) + return logger_type(file=filename, **kwargs) class BaseRotatingLogger(MessageWriter, ABC): @@ -183,13 +188,11 @@ class BaseRotatingLogger(MessageWriter, ABC): rollover_count: int = 0 def __init__(self, **kwargs: Any) -> None: - super().__init__(**{**kwargs, "file": None}) - self.writer_kwargs = kwargs @property @abstractmethod - def writer(self) -> FileIOMessageWriter: + def writer(self) -> MessageWriter: """This attribute holds an instance of a writer class which manages the actual file IO.""" raise NotImplementedError @@ -243,7 +246,7 @@ def on_message_received(self, msg: Message) -> None: self.writer.on_message_received(msg) - def _get_new_writer(self, filename: StringPathLike) -> FileIOMessageWriter: + def _get_new_writer(self, filename: StringPathLike) -> MessageWriter: """Instantiate a new writer. .. note:: @@ -261,10 +264,7 @@ def _get_new_writer(self, filename: StringPathLike) -> FileIOMessageWriter: if suffix not in self._supported_formats: continue logger = Logger(filename=filename, **self.writer_kwargs) - if isinstance(logger, FileIOMessageWriter): - return logger - elif isinstance(logger, Printer) and logger.file is not None: - return cast("FileIOMessageWriter", logger) + return logger raise ValueError( f'The log format of "{pathlib.Path(filename).name}" ' @@ -366,18 +366,25 @@ def __init__( self._writer = self._get_new_writer(self.base_filename) + def _get_new_writer(self, filename: StringPathLike) -> SizedMessageWriter: + writer = super()._get_new_writer(filename) + if isinstance(writer, SizedMessageWriter): + return writer + raise TypeError + @property - def writer(self) -> FileIOMessageWriter: + def writer(self) -> SizedMessageWriter: return self._writer def should_rollover(self, msg: Message) -> bool: if self.max_bytes <= 0: return False - if self.writer.file_size() >= self.max_bytes: - return True + file_size = self.writer.file_size() + if file_size is None: + return False - return False + return file_size >= self.max_bytes def do_rollover(self) -> None: if self.writer: diff --git a/can/io/player.py b/can/io/player.py index 2451eab41..c0015b185 100644 --- a/can/io/player.py +++ b/can/io/player.py @@ -11,17 +11,16 @@ from typing import ( Any, Final, - Union, ) from .._entry_points import read_entry_points from ..message import Message -from ..typechecking import AcceptedIOType, FileLike, StringPathLike +from ..typechecking import StringPathLike from .asc import ASCReader from .blf import BLFReader from .canutils import CanutilsLogReader from .csv import CSVReader -from .generic import BinaryIOMessageReader, MessageReader +from .generic import BinaryIOMessageReader, MessageReader, TextIOMessageReader from .mf4 import MF4Reader from .sqlite import SqliteReader from .trc import TRCReader @@ -58,24 +57,25 @@ def _get_logger_for_suffix(suffix: str) -> type[MessageReader]: raise ValueError(f'No read support for unknown log format "{suffix}"') from None -def _decompress( - filename: StringPathLike, -) -> tuple[type[MessageReader], Union[str, FileLike]]: +def _decompress(filename: StringPathLike, **kwargs: Any) -> MessageReader: """ Return the suffix and io object of the decompressed file. """ suffixes = pathlib.Path(filename).suffixes if len(suffixes) != 2: raise ValueError( - f"No write support for unknown log format \"{''.join(suffixes)}\"" - ) from None + f"No read support for unknown log format \"{''.join(suffixes)}\"" + ) real_suffix = suffixes[-2].lower() reader_type = _get_logger_for_suffix(real_suffix) - mode = "rb" if issubclass(reader_type, BinaryIOMessageReader) else "rt" + if issubclass(reader_type, TextIOMessageReader): + return reader_type(gzip.open(filename, mode="rt"), **kwargs) + elif issubclass(reader_type, BinaryIOMessageReader): + return reader_type(gzip.open(filename, mode="rb"), **kwargs) - return reader_type, gzip.open(filename, mode) + raise ValueError(f"No read support for unknown log format \"{''.join(suffixes)}\"") def LogReader(filename: StringPathLike, **kwargs: Any) -> MessageReader: # noqa: N802 @@ -118,12 +118,11 @@ def LogReader(filename: StringPathLike, **kwargs: Any) -> MessageReader: # noqa _update_reader_plugins() suffix = pathlib.PurePath(filename).suffix.lower() - file_or_filename: AcceptedIOType = filename if suffix == ".gz": - reader_type, file_or_filename = _decompress(filename) - else: - reader_type = _get_logger_for_suffix(suffix) - return reader_type(file=file_or_filename, **kwargs) + return _decompress(filename) + + reader_type = _get_logger_for_suffix(suffix) + return reader_type(file=filename, **kwargs) class MessageSync: diff --git a/can/io/printer.py b/can/io/printer.py index 30bc227ab..786cb7261 100644 --- a/can/io/printer.py +++ b/can/io/printer.py @@ -3,16 +3,18 @@ """ import logging -from typing import Any, Optional, TextIO, Union, cast +import sys +from io import TextIOWrapper +from typing import Any, TextIO, Union from ..message import Message from ..typechecking import StringPathLike -from .generic import MessageWriter +from .generic import TextIOMessageWriter log = logging.getLogger("can.io.printer") -class Printer(MessageWriter): +class Printer(TextIOMessageWriter): """ The Printer class is a subclass of :class:`~can.Listener` which simply prints any messages it receives to the terminal (stdout). A message is turned into a @@ -22,11 +24,9 @@ class Printer(MessageWriter): standard out """ - file: Optional[TextIO] - def __init__( self, - file: Optional[Union[StringPathLike, TextIO]] = None, + file: Union[StringPathLike, TextIO, TextIOWrapper] = sys.stdout, append: bool = False, **kwargs: Any, ) -> None: @@ -38,18 +38,17 @@ def __init__( :param append: If set to `True` messages, are appended to the file, else the file is truncated """ - self.write_to_file = file is not None - mode = "a" if append else "w" - super().__init__(file, mode=mode) + super().__init__(file, mode="a" if append else "w") def on_message_received(self, msg: Message) -> None: - if self.write_to_file: - cast("TextIO", self.file).write(str(msg) + "\n") - else: - print(msg) # noqa: T201 + self.file.write(str(msg) + "\n") def file_size(self) -> int: """Return an estimate of the current file size in bytes.""" - if self.file is not None: + if self.file is not sys.stdout: return self.file.tell() return 0 + + def stop(self) -> None: + if self.file is not sys.stdout: + super().stop() diff --git a/can/io/sqlite.py b/can/io/sqlite.py index 686e2d038..73aa2961c 100644 --- a/can/io/sqlite.py +++ b/can/io/sqlite.py @@ -8,9 +8,11 @@ import sqlite3 import threading import time -from collections.abc import Generator +from collections.abc import Generator, Iterator from typing import Any +from typing_extensions import TypeAlias + from can.listener import BufferedReader from can.message import Message @@ -19,6 +21,8 @@ log = logging.getLogger("can.io.sqlite") +_MessageTuple: TypeAlias = "tuple[float, int, bool, bool, bool, int, memoryview[int]]" + class SqliteReader(MessageReader): """ @@ -49,7 +53,6 @@ def __init__( do not accept file-like objects as the `file` parameter. It also runs in ``append=True`` mode all the time. """ - super().__init__(file=None) self._conn = sqlite3.connect(file) self._cursor = self._conn.cursor() self.table_name = table_name @@ -59,7 +62,7 @@ def __iter__(self) -> Generator[Message, None, None]: yield SqliteReader._assemble_message(frame_data) @staticmethod - def _assemble_message(frame_data): + def _assemble_message(frame_data: _MessageTuple) -> Message: timestamp, can_id, is_extended, is_remote, is_error, dlc, data = frame_data return Message( timestamp=timestamp, @@ -71,12 +74,12 @@ def _assemble_message(frame_data): data=data, ) - def __len__(self): + def __len__(self) -> int: # this might not run in constant time result = self._cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") return int(result.fetchone()[0]) - def read_all(self): + def read_all(self) -> Iterator[Message]: """Fetches all messages in the database. :rtype: Generator[can.Message] @@ -84,9 +87,8 @@ def read_all(self): result = self._cursor.execute(f"SELECT * FROM {self.table_name}").fetchall() return (SqliteReader._assemble_message(frame) for frame in result) - def stop(self): + def stop(self) -> None: """Closes the connection to the database.""" - super().stop() self._conn.close() @@ -154,11 +156,10 @@ def __init__( f"The append argument should not be used in " f"conjunction with the {self.__class__.__name__}." ) - super().__init__(file=None) + BufferedReader.__init__(self) self.table_name = table_name self._db_filename = file self._stop_running_event = threading.Event() - self._conn = None self._writer_thread = threading.Thread(target=self._db_writer_thread) self._writer_thread.start() self.num_frames = 0 @@ -167,7 +168,8 @@ def __init__( f"INSERT INTO {self.table_name} VALUES (?, ?, ?, ?, ?, ?, ?)" ) - def _create_db(self): + @staticmethod + def _create_db(file: StringPathLike, table_name: str) -> sqlite3.Connection: """Creates a new databae or opens a connection to an existing one. .. note:: @@ -175,11 +177,11 @@ def _create_db(self): hence we setup the db here. It has the upside of running async. """ log.debug("Creating sqlite database") - self._conn = sqlite3.connect(self._db_filename) + conn = sqlite3.connect(file) # create table structure - self._conn.cursor().execute( - f"""CREATE TABLE IF NOT EXISTS {self.table_name} + conn.cursor().execute( + f"""CREATE TABLE IF NOT EXISTS {table_name} ( ts REAL, arbitration_id INTEGER, @@ -190,14 +192,16 @@ def _create_db(self): data BLOB )""" ) - self._conn.commit() + conn.commit() + + return conn - def _db_writer_thread(self): - self._create_db() + def _db_writer_thread(self) -> None: + conn = SqliteWriter._create_db(self._db_filename, self.table_name) try: while True: - messages = [] # reset buffer + messages: list[_MessageTuple] = [] # reset buffer msg = self.get_message(self.GET_MESSAGE_TIMEOUT) while msg is not None: @@ -226,10 +230,10 @@ def _db_writer_thread(self): count = len(messages) if count > 0: - with self._conn: + with conn: # log.debug("Writing %d frames to db", count) - self._conn.executemany(self._insert_template, messages) - self._conn.commit() # make the changes visible to the entire database + conn.executemany(self._insert_template, messages) + conn.commit() # make the changes visible to the entire database self.num_frames += count self.last_write = time.time() @@ -238,14 +242,13 @@ def _db_writer_thread(self): break finally: - self._conn.close() + conn.close() log.info("Stopped sqlite writer after writing %d messages", self.num_frames) - def stop(self): + def stop(self) -> None: """Stops the reader an writes all remaining messages to the database. Thus, this might take a while and block. """ BufferedReader.stop(self) self._stop_running_event.set() self._writer_thread.join() - MessageReader.stop(self) diff --git a/can/io/trc.py b/can/io/trc.py index a07a53a4d..c286d585d 100644 --- a/can/io/trc.py +++ b/can/io/trc.py @@ -12,6 +12,7 @@ from collections.abc import Generator from datetime import datetime, timedelta, timezone from enum import Enum +from io import TextIOWrapper from typing import Any, Callable, Optional, TextIO, Union from ..message import Message @@ -31,8 +32,8 @@ class TRCFileVersion(Enum): V2_0 = 200 V2_1 = 201 - def __ge__(self, other): - if self.__class__ is other.__class__: + def __ge__(self, other: Any) -> bool: + if isinstance(other, TRCFileVersion): return self.value >= other.value return NotImplemented @@ -42,8 +43,6 @@ class TRCReader(TextIOMessageReader): Iterator of CAN messages from a TRC logging file. """ - file: TextIO - def __init__( self, file: Union[StringPathLike, TextIO], @@ -73,7 +72,7 @@ def start_time(self) -> Optional[datetime]: return datetime.fromtimestamp(self._start_time, timezone.utc) return None - def _extract_header(self): + def _extract_header(self) -> str: line = "" for _line in self.file: line = _line.strip() @@ -276,9 +275,6 @@ class TRCWriter(TextIOMessageWriter): If the first message does not have a timestamp, it is set to zero. """ - file: TextIO - first_timestamp: Optional[float] - FORMAT_MESSAGE = ( "{msgnr:>7} {time:13.3f} DT {channel:>2} {id:>8} {dir:>2} - {dlc:<4} {data}" ) @@ -286,7 +282,7 @@ class TRCWriter(TextIOMessageWriter): def __init__( self, - file: Union[StringPathLike, TextIO], + file: Union[StringPathLike, TextIO, TextIOWrapper], channel: int = 1, **kwargs: Any, ) -> None: @@ -308,7 +304,7 @@ def __init__( self.filepath = os.path.abspath(self.file.name) self.header_written = False self.msgnr = 0 - self.first_timestamp = None + self.first_timestamp: Optional[float] = None self.file_version = TRCFileVersion.V2_1 self._msg_fmt_string = self.FORMAT_MESSAGE_V1_0 self._format_message = self._format_message_init @@ -360,7 +356,7 @@ def _write_header_v2_1(self, start_time: datetime) -> None: ] self.file.writelines(line + "\n" for line in lines) - def _format_message_by_format(self, msg, channel): + def _format_message_by_format(self, msg: Message, channel: int) -> str: if msg.is_extended_id: arb_id = f"{msg.arbitration_id:07X}" else: @@ -368,6 +364,8 @@ def _format_message_by_format(self, msg, channel): data = [f"{byte:02X}" for byte in msg.data] + if self.first_timestamp is None: + raise ValueError serialized = self._msg_fmt_string.format( msgnr=self.msgnr, time=(msg.timestamp - self.first_timestamp) * 1000, @@ -379,7 +377,7 @@ def _format_message_by_format(self, msg, channel): ) return serialized - def _format_message_init(self, msg, channel): + def _format_message_init(self, msg: Message, channel: int) -> str: if self.file_version == TRCFileVersion.V1_0: self._format_message = self._format_message_by_format self._msg_fmt_string = self.FORMAT_MESSAGE_V1_0 diff --git a/can/listener.py b/can/listener.py index b450cf36d..6256d33b6 100644 --- a/can/listener.py +++ b/can/listener.py @@ -147,7 +147,9 @@ def __init__(self, **kwargs: Any) -> None: stacklevel=2, ) if sys.version_info < (3, 10): - self.buffer = asyncio.Queue(loop=kwargs["loop"]) + self.buffer = asyncio.Queue( # pylint: disable=unexpected-keyword-arg + loop=kwargs["loop"] + ) return self.buffer = asyncio.Queue() diff --git a/can/typechecking.py b/can/typechecking.py index 36343ddaa..fc0c87c0d 100644 --- a/can/typechecking.py +++ b/can/typechecking.py @@ -1,9 +1,9 @@ """Types for mypy type-checking""" -import gzip -import struct +import io import sys -import typing +from collections.abc import Iterable, Sequence +from typing import IO, TYPE_CHECKING, Any, NewType, Union if sys.version_info >= (3, 10): from typing import TypeAlias @@ -16,8 +16,9 @@ from typing_extensions import TypedDict -if typing.TYPE_CHECKING: +if TYPE_CHECKING: import os + import struct class CanFilter(TypedDict): @@ -31,32 +32,31 @@ class CanFilterExtended(TypedDict): extended: bool -CanFilters = typing.Sequence[typing.Union[CanFilter, CanFilterExtended]] +CanFilters = Sequence[Union[CanFilter, CanFilterExtended]] # TODO: Once buffer protocol support lands in typing, we should switch to that, # since can.message.Message attempts to call bytearray() on the given data, so # this should have the same typing info. # # See: https://github.com/python/typing/issues/593 -CanData = typing.Union[bytes, bytearray, int, typing.Iterable[int]] +CanData = Union[bytes, bytearray, int, Iterable[int]] # Used for the Abstract Base Class ChannelStr = str ChannelInt = int -Channel = typing.Union[ChannelInt, ChannelStr] +Channel = Union[ChannelInt, ChannelStr, Sequence[ChannelInt]] # Used by the IO module -FileLike = typing.Union[typing.TextIO, typing.BinaryIO, gzip.GzipFile] -StringPathLike = typing.Union[str, "os.PathLike[str]"] -AcceptedIOType = typing.Union[FileLike, StringPathLike] +FileLike = Union[IO[Any], io.TextIOWrapper, io.BufferedIOBase] +StringPathLike = Union[str, "os.PathLike[str]"] -BusConfig = typing.NewType("BusConfig", dict[str, typing.Any]) +BusConfig = NewType("BusConfig", dict[str, Any]) # Used by CLI scripts -TAdditionalCliArgs: TypeAlias = dict[str, typing.Union[str, int, float, bool]] +TAdditionalCliArgs: TypeAlias = dict[str, Union[str, int, float, bool]] TDataStructs: TypeAlias = dict[ - typing.Union[int, tuple[int, ...]], - typing.Union[struct.Struct, tuple, None], + Union[int, tuple[int, ...]], + "Union[struct.Struct, tuple[struct.Struct, *tuple[float, ...]]]", ] @@ -65,7 +65,7 @@ class AutoDetectedConfig(TypedDict): channel: Channel -ReadableBytesLike = typing.Union[bytes, bytearray, memoryview] +ReadableBytesLike = Union[bytes, bytearray, memoryview] class BitTimingDict(TypedDict): diff --git a/can/util.py b/can/util.py index 2f32dda8e..edc35aa5f 100644 --- a/can/util.py +++ b/can/util.py @@ -50,7 +50,7 @@ def load_file_config( - path: Optional[typechecking.AcceptedIOType] = None, section: str = "default" + path: Optional[typechecking.StringPathLike] = None, section: str = "default" ) -> dict[str, str]: """ Loads configuration from file with following content:: @@ -120,7 +120,7 @@ def load_environment_config(context: Optional[str] = None) -> dict[str, str]: def load_config( - path: Optional[typechecking.AcceptedIOType] = None, + path: Optional[typechecking.StringPathLike] = None, config: Optional[dict[str, Any]] = None, context: Optional[str] = None, ) -> typechecking.BusConfig: diff --git a/doc/conf.py b/doc/conf.py index f4a9ab95f..1f26e363a 100755 --- a/doc/conf.py +++ b/doc/conf.py @@ -122,7 +122,14 @@ # disable specific warnings nitpick_ignore = [ # Ignore warnings for type aliases. Remove once Sphinx supports PEP613 + ("py:class", "OpenTextModeUpdating"), + ("py:class", "OpenTextModeWriting"), + ("py:class", "OpenBinaryModeUpdating"), + ("py:class", "OpenBinaryModeWriting"), + ("py:class", "OpenTextModeReading"), + ("py:class", "OpenBinaryModeReading"), ("py:class", "BusConfig"), + # ("py:class", "can.io.generic._IoTypeVar"), ("py:class", "can.typechecking.BusConfig"), ("py:class", "can.typechecking.CanFilter"), ("py:class", "can.typechecking.CanFilterExtended"), diff --git a/doc/file_io.rst b/doc/file_io.rst index ff9431695..329ccac53 100644 --- a/doc/file_io.rst +++ b/doc/file_io.rst @@ -94,7 +94,7 @@ as further references can-utils can be used: Log (.log can-utils Logging format) ----------------------------------- -CanutilsLogWriter logs CAN data to an ASCII log file compatible with `can-utils ` +CanutilsLogWriter logs CAN data to an ASCII log file compatible with `can-utils `_ As specification following references can-utils can be used: `asc2log `_, `log2asc `_. diff --git a/doc/internal-api.rst b/doc/internal-api.rst index 73984bf1a..9e544f052 100644 --- a/doc/internal-api.rst +++ b/doc/internal-api.rst @@ -90,8 +90,8 @@ About the IO module Handling of the different file formats is implemented in ``can.io``. Each file/IO type is within a separate module and ideally implements both a *Reader* and a *Writer*. -The reader usually extends :class:`can.io.generic.BaseIOHandler`, while -the writer often additionally extends :class:`can.Listener`, +The reader extends :class:`can.io.generic.MessageReader`, while the writer extends +:class:`can.io.generic.MessageWriter`, a subclass of the :class:`can.Listener`, to be able to be passed directly to a :class:`can.Notifier`. @@ -104,9 +104,9 @@ Ideally add both reading and writing support for the new file format, although t 1. Create a new module: *can/io/canstore.py* (*or* simply copy some existing one like *can/io/csv.py*) -2. Implement a reader ``CanstoreReader`` (which often extends :class:`can.io.generic.BaseIOHandler`, but does not have to). +2. Implement a reader ``CanstoreReader`` which extends :class:`can.io.generic.MessageReader`. Besides from a constructor, only ``__iter__(self)`` needs to be implemented. -3. Implement a writer ``CanstoreWriter`` (which often extends :class:`can.io.generic.BaseIOHandler` and :class:`can.Listener`, but does not have to). +3. Implement a writer ``CanstoreWriter`` which extends :class:`can.io.generic.MessageWriter`. Besides from a constructor, only ``on_message_received(self, msg)`` needs to be implemented. 4. Add a case to ``can.io.player.LogReader``'s ``__new__()``. 5. Document the two new classes (and possibly additional helpers) with docstrings and comments. @@ -126,7 +126,9 @@ IO Utilities .. automodule:: can.io.generic + :show-inheritance: :members: + :private-members: :member-order: bysource diff --git a/doc/notifier.rst b/doc/notifier.rst index 05edbd90d..e1b160a6e 100644 --- a/doc/notifier.rst +++ b/doc/notifier.rst @@ -58,7 +58,8 @@ readers are also documented here. be added using the ``can.io.message_writer`` entry point. The format of the entry point is ``reader_name=module:classname`` where ``classname`` - is a :class:`can.io.generic.BaseIOHandler` concrete implementation. + is a concrete implementation of :class:`~can.io.generic.MessageReader` or + :class:`~can.io.generic.MessageWriter`. :: diff --git a/test/logformats_test.py b/test/logformats_test.py index f3fe485b2..806d1f133 100644 --- a/test/logformats_test.py +++ b/test/logformats_test.py @@ -212,7 +212,7 @@ def test_path_like_explicit_stop(self): self._write_all(writer) self._ensure_fsync(writer) writer.stop() - if hasattr(writer.file, "closed"): + if hasattr(writer, "file") and hasattr(writer.file, "closed"): self.assertTrue(writer.file.closed) print("reading all messages") @@ -220,7 +220,7 @@ def test_path_like_explicit_stop(self): read_messages = list(reader) # redundant, but this checks if stop() can be called multiple times reader.stop() - if hasattr(writer.file, "closed"): + if hasattr(writer, "file") and hasattr(writer.file, "closed"): self.assertTrue(writer.file.closed) # check if at least the number of messages matches @@ -243,7 +243,7 @@ def test_path_like_context_manager(self): self._write_all(writer) self._ensure_fsync(writer) w = writer - if hasattr(w.file, "closed"): + if hasattr(writer, "file") and hasattr(w.file, "closed"): self.assertTrue(w.file.closed) # read all written messages @@ -251,7 +251,7 @@ def test_path_like_context_manager(self): with self.reader_constructor(self.test_file_name) as reader: read_messages = list(reader) r = reader - if hasattr(r.file, "closed"): + if hasattr(writer, "file") and hasattr(r.file, "closed"): self.assertTrue(r.file.closed) # check if at least the number of messages matches; @@ -274,7 +274,7 @@ def test_file_like_explicit_stop(self): self._write_all(writer) self._ensure_fsync(writer) writer.stop() - if hasattr(my_file, "closed"): + if hasattr(writer, "file") and hasattr(my_file, "closed"): self.assertTrue(my_file.closed) print("reading all messages") @@ -283,7 +283,7 @@ def test_file_like_explicit_stop(self): read_messages = list(reader) # redundant, but this checks if stop() can be called multiple times reader.stop() - if hasattr(my_file, "closed"): + if hasattr(writer, "file") and hasattr(my_file, "closed"): self.assertTrue(my_file.closed) # check if at least the number of messages matches @@ -380,7 +380,7 @@ def _write_all(self, writer): writer(msg) def _ensure_fsync(self, io_handler): - if hasattr(io_handler.file, "fileno"): + if hasattr(io_handler, "file") and hasattr(io_handler.file, "fileno"): io_handler.file.flush() os.fsync(io_handler.file.fileno())