diff --git a/can/exceptions.py b/can/exceptions.py index e1c970e27..9a7a6201c 100644 --- a/can/exceptions.py +++ b/can/exceptions.py @@ -9,6 +9,7 @@ +-- CanInitializationError +-- CanOperationError +-- CanTimeoutError + +-- CanBitRateError Keep in mind that some functions and methods may raise different exceptions. For example, validating typical arguments and parameters might result in a @@ -111,6 +112,15 @@ class CanTimeoutError(CanError, TimeoutError): """ +class CanBitRateError(CanError): + """Indicates the invalid / unsupported bitrate. + + Example scenarios: + - Invalid / unsupported bitrate on bus + - Can't convert between bit timing and bitrate + """ + + @contextmanager def error_check( error_message: Optional[str] = None, diff --git a/can/interfaces/__init__.py b/can/interfaces/__init__.py index f220d28e5..7f891474a 100644 --- a/can/interfaces/__init__.py +++ b/can/interfaces/__init__.py @@ -59,6 +59,7 @@ "neousys": ("can.interfaces.neousys", "NeousysBus"), "etas": ("can.interfaces.etas", "EtasBus"), "socketcand": ("can.interfaces.socketcand", "SocketCanDaemonBus"), + "zlg": ("can.interfaces.zlg", "ZlgCanBus"), } diff --git a/can/interfaces/zlg/__init__.py b/can/interfaces/zlg/__init__.py new file mode 100644 index 000000000..68202f2ca --- /dev/null +++ b/can/interfaces/zlg/__init__.py @@ -0,0 +1,16 @@ +""" +Unofficial ZLG USBCAN/USBCANFD implementation for Linux +""" + +__version__ = "2.0" +__date__ = "20220209" +__author__ = "Keelung.Yang@flex.com" +__history__ = """ + 1. Initial creation, Keelung.Yang@flex.com, 2022-01-06 + 2. Determine CAN-FD by data_bitrate, Keelung.Yang@flex.com, 2022-01-27 + 3. Remove bus padding since it should be handled in upper layer, Keelung.Yang@flex.com, 2022-01-28 + 4. Balance receiving CAN/CAN-FD messages, Keelung.Yang@flex.com, 2022-02-06 + 5. Implement software timeout, Keelung.Yang@flex.com, 2022-02-09 +""" + +from can.interfaces.zlg.bus import ZlgCanBus diff --git a/can/interfaces/zlg/bus.py b/can/interfaces/zlg/bus.py new file mode 100644 index 000000000..7d958bbda --- /dev/null +++ b/can/interfaces/zlg/bus.py @@ -0,0 +1,238 @@ +import time +import ctypes +import platform + +from can import BusABC, BusState, Message +from can import CanInitializationError, CanOperationError, CanTimeoutError + +from .vci import * +from .timing import ZlgBitTiming + + +DeviceType = ZCAN_DEVICE.USBCANFD_200U + + +class ZlgCanBus(BusABC): + def __init__(self, channel, device=0, tres=True, **kwargs): + """ + :param channel: channel index, [0, 1,,,] + :param device: device index, [0, 1,,,] + :param tres: enable/disable termination resistor on specified channel + """ + if platform.system() != "Linux": + raise CanInitializationError(f"Only Linux is supported currently") + self.bitrate = kwargs.get("bitrate", 500000) + self.data_bitrate = kwargs.get("data_bitrate", None) + self.channel_info = ( + f"{self.__class__.__name__}{device}:{channel}@{self.bitrate}" + ) + if self.data_bitrate: + self.channel_info += f"/{self.data_bitrate}" + self._dev_type = DeviceType.value + self._dev_index = int(device) + self._dev_channel = int(channel) + self._dev_timestamp = time.time() + self.is_opened = self.open() + if not self.is_opened: + raise CanInitializationError(f"Failed to open {self.channel_info}") + self.tres = bool(tres) + super().__init__(int(channel), **kwargs) + + @property + def tres(self): + """Termination resistor""" + return self._tres + + @tres.setter + def tres(self, value): + self._tres = bool(value) + if not vci_channel_enable_tres( + self._dev_type, self._dev_index, self._dev_channel, self._tres + ): + raise CanOperationError( + f'Failed to {"enable" if value else "disable"} ' + f"termination resistor for {self.channel_info} !" + ) + + @property + def state(self): + err_msg = ZCAN_ERR_MSG() + if not vci_channel_read_info( + self._dev_type, self._dev_index, self._dev_channel, err_msg + ): + raise CanOperationError(f"Failed to read CAN{self._dev_channel} status!") + if err_msg.info.err: + if err_msg.info.est: + return BusState.ERROR + else: + return BusState.ACTIVE + return None # https://github.com/hardbyte/python-can/issues/736 + + @state.setter + def state(self, value): + raise NotImplementedError() + + def _from_raw(self, raw): + return Message( + timestamp=raw.header.ts, + arbitration_id=raw.header.id, + is_fd=bool(raw.header.info.fmt), + is_extended_id=bool(raw.header.info.sef), + is_remote_frame=bool(raw.header.info.sdf), + is_error_frame=bool(raw.header.info.err), + bitrate_switch=bool(raw.header.info.brs), + dlc=raw.header.len, + data=bytes(raw.dat[: raw.header.len]), + channel=raw.header.chn, + ) + + def _to_raw(self, msg): + info = ZCAN_MSG_INFO( + txm=False, + fmt=msg.is_fd, + sdf=msg.is_remote_frame, + sef=msg.is_extended_id, + err=msg.is_error_frame, + brs=msg.bitrate_switch, + est=msg.error_state_indicator, + # pad + ) + header = ZCAN_MSG_HDR( + ts=(int(time.time() - self._dev_timestamp) * 1000) & 0xFFFF_FFFF, + id=msg.arbitration_id, + info=info, + chn=self._dev_channel, + len=msg.dlc, + ) + if msg.is_fd: + raw = ZCAN_FD_MSG(header=header) + else: + raw = ZCAN_20_MSG(header=header) + ctypes.memmove(raw.dat, bytes(msg.data), msg.dlc) + return raw + + def _available_msgs(self) -> tuple[bool, int]: + """Return (is_fd, buffered_msg_count)""" + can_msg_count = vci_can_get_recv_num( + self._dev_type, self._dev_index, self._dev_channel + ) + canfd_msg_count = vci_canfd_get_recv_num( + self._dev_type, self._dev_index, self._dev_channel + ) + if can_msg_count > 0 and canfd_msg_count > 0: + if canfd_msg_count > can_msg_count: + return True, canfd_msg_count + else: + return False, can_msg_count + elif can_msg_count == 0 and canfd_msg_count == 0: + return bool(self.data_bitrate), 0 + else: + return canfd_msg_count > 0, canfd_msg_count or can_msg_count + + def _recv_one(self, fd) -> Message: + delay = 1 # ZLG cann't comfirm what's happen if delay == 0 + rx_buf = (ZCAN_FD_MSG * 1)() if fd else (ZCAN_20_MSG * 1)() + if fd: + ret = vci_canfd_recv( + self._dev_type, + self._dev_index, + self._dev_channel, + rx_buf, + len(rx_buf), + delay, + ) + else: + ret = vci_can_recv( + self._dev_type, + self._dev_index, + self._dev_channel, + rx_buf, + len(rx_buf), + delay, + ) + if ret > 0: + return self._from_raw(rx_buf[0]) + else: + return None + + def _recv_internal(self, timeout): + t1 = time.time() + while True: + is_fd, msg_count = self._available_msgs() + if msg_count: + if msg := self._recv_one(is_fd): + return msg, self.filters is None + else: + raise CanOperationError(f"Failed to receive!") + elif timeout is None: + time.sleep(0.001) + elif timeout < 0.001: + return None, self.filters is None + elif (time.time() - t1) < timeout: + time.sleep(0.001) + else: + raise CanTimeoutError(f"Receive timeout!") + + def _send_one(self, msg) -> int: + tx_buf = (ZCAN_FD_MSG * 1)() if msg.is_fd else (ZCAN_20_MSG * 1)() + tx_buf[0] = self._to_raw(msg) + if msg.is_fd: + return vci_canfd_send( + self._dev_type, self._dev_index, self._dev_channel, tx_buf, len(tx_buf) + ) + else: + return vci_can_send( + self._dev_type, self._dev_index, self._dev_channel, tx_buf, len(tx_buf) + ) + + def _send_internal(self, msg, timeout=None) -> None: + while timeout is None: + if self._send_one(msg): + return + else: + t1 = time.time() + while (time.time() - t1) < timeout or timeout < 0.001: + if self._send_one(msg): + return + elif timeout < 0.001: + return + else: + raise CanTimeoutError(f"Send message {msg.arbitration_id:03X} timeout!") + + def send(self, msg, timeout=None) -> None: + # The maximum tx timeout is 4000ms, limited by firmware, as explained officially + dev_timeout = 4000 if timeout is None else 10 + vci_channel_set_tx_timeout( + self._dev_type, self._dev_index, self._dev_channel, dev_timeout + ) + if self.data_bitrate: # Force FD if data_bitrate + msg.is_fd = True + self._send_internal(msg, timeout) + + def open(self) -> bool: + timing = ZlgBitTiming(self._dev_type) + clock = timing.f_clock + bitrate = timing.timing(self.bitrate) + if self.data_bitrate: + data_bitrate = timing.timing(self.data_bitrate) + else: + data_bitrate = bitrate + if not vci_device_open(self._dev_type, self._dev_index): + return False + if not vci_channel_open( + self._dev_type, + self._dev_index, + self._dev_channel, + clock, + bitrate, + data_bitrate, + ): + vci_device_close(self._dev_type, self._dev_index) + return False + else: + return True + + def shutdown(self) -> None: + super().shutdown() + vci_channel_close(self._dev_type, self._dev_index, self._dev_channel) + vci_device_close(self._dev_type, self._dev_index) diff --git a/can/interfaces/zlg/timing.py b/can/interfaces/zlg/timing.py new file mode 100644 index 000000000..118351b26 --- /dev/null +++ b/can/interfaces/zlg/timing.py @@ -0,0 +1,44 @@ +from can import BitTiming, CanInitializationError, CanBitRateError +from .vci import ZCAN_BIT_TIMING, ZCAN_DEVICE + + +# Official timing calculation +# https://manual.zlg.cn/web/#/188/6981 +class ZlgBitTiming(BitTiming): + def __init__(self, device, **kwargs): + try: + self._device = ZCAN_DEVICE(device) + except: + raise CanInitializationError(f"Unsupported ZLG CAN device {device} !") + if self._device in ( + ZCAN_DEVICE.USBCAN, + ZCAN_DEVICE.USBCANFD_200U, + ): + kwargs.setdefault("f_clock", 60000000) + self.speeds = { + 125_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=31), + 250_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=15), + 500_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=7), + 1_000_000: ZCAN_BIT_TIMING(tseg1=46, tseg2=11, sjw=3, brp=0), + 2_000_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=1), + 4_000_000: ZCAN_BIT_TIMING(tseg1=10, tseg2=2, sjw=2, brp=0), + 5_000_000: ZCAN_BIT_TIMING(tseg1=7, tseg2=2, sjw=2, brp=0), + } + super().__init__(**kwargs) + + def timing(self, bitrate=None, force=False) -> ZCAN_BIT_TIMING: + if bitrate in self.speeds: + return self.speeds[bitrate] + elif force: + return ZCAN_BIT_TIMING( + tseg1=self.tseg1, + tseg2=self.tseg2, + sjw=self.sjw, + brp=self.brp, + ) + else: + raise CanBitRateError(f"Unsupported {bitrate=}") + + @property + def bitrates(self) -> list[int]: + return self.speeds.keys() diff --git a/can/interfaces/zlg/vci.py b/can/interfaces/zlg/vci.py new file mode 100644 index 000000000..3dce4768b --- /dev/null +++ b/can/interfaces/zlg/vci.py @@ -0,0 +1,261 @@ +""" +Wrap libusbcanfd.so +""" + +####################################################################### +# For consistency, use the coding style and definitions +# as similar as USBCANFD_DEMO.py in Linux driver package +# https://manual.zlg.cn/web/#/146 +####################################################################### + +import logging + +from enum import unique, auto, Enum, Flag +from ctypes import c_ubyte, c_uint8, c_uint16, c_uint32 +from ctypes import cdll, byref, Structure + + +log = logging.getLogger("can.zlg") +lib = cdll.LoadLibrary("libusbcanfd.so") + + +@unique +class ZCAN_DEVICE(Enum): + """CAN Device Type + Only USBCAN / USBCANFD_200U supported now + For more infomation, please check + https://manual.zlg.cn/web/#/188/6984 + """ + + USBCAN = 4 + USBCANFD_200U = 33 + + +class ZCAN_MODE(Flag): + LISTEN_ONLY = auto() # 0: Normal, 1: Listen only + BOSCH = auto() # 0: ISO, 1: BOSCH + NORMAL = (~LISTEN_ONLY & ~BOSCH) & 0x03 + + +@unique +class ZCAN_REF(Enum): + FILTER = 0x14 # CAN Message Filter + TRES = 0x18 # Termination resistor + TX_TIMEOUT = 0x44 # Send timeout in ms + TTX_SETTING = 0x16 # Timing send settins + TTX_ENABLE = 0x17 # Timing send enable + + +class ZCAN_TRES(Structure): # Termination resistor + ON = 1 + OFF = 0 + _fields_ = [("enable", c_uint8)] + + +class ZCAN_TX_TIMEOUT(Structure): # Tx Timeout + _fields_ = [("value", c_uint32)] + + +class ZCAN_MSG_INFO(Structure): + _fields_ = [ + ("txm", c_uint32, 4), # TXTYPE, 0: normal, 1: once, 2: self + ("fmt", c_uint32, 4), # 0:CAN2.0, 1:CANFD + ("sdf", c_uint32, 1), # 0: data frame, 1: remote frame + ("sef", c_uint32, 1), # 0: std frame, 1: ext frame + ("err", c_uint32, 1), # error flag + ("brs", c_uint32, 1), # bit-rate switch, 0: Not speed up, 1: speed up + ("est", c_uint32, 1), # error state + ("pad", c_uint32, 19), + ] + + +class ZCAN_MSG_HDR(Structure): + _fields_ = [ + ("ts", c_uint32), # timestamp + ("id", c_uint32), # can-id + ("info", ZCAN_MSG_INFO), + ("pad", c_uint16), + ("chn", c_uint8), # channel + ("len", c_uint8), # data length + ] + + +class ZCAN_20_MSG(Structure): + _fields_ = [("header", ZCAN_MSG_HDR), ("dat", c_ubyte * 8)] + + +class ZCAN_FD_MSG(Structure): + _fields_ = [("header", ZCAN_MSG_HDR), ("dat", c_ubyte * 64)] + + +class ZCAN_ERR_MSG(Structure): + _fields_ = [("header", ZCAN_MSG_HDR), ("dat", c_ubyte * 8)] + + +class ZCAN_BIT_TIMING(Structure): + _fields_ = [ + ("tseg1", c_uint8), + ("tseg2", c_uint8), + ("sjw", c_uint8), + ("smp", c_uint8), # Not used + ("brp", c_uint16), + ] + + +class ZCANFD_INIT(Structure): + _fields_ = [ + ("clk", c_uint32), + ("mode", c_uint32), + ("atim", ZCAN_BIT_TIMING), + ("dtim", ZCAN_BIT_TIMING), + ] + + +def vci_device_open(dt, di) -> bool: + ret = lib.VCI_OpenDevice(c_uint32(dt), c_uint32(di)) + if ret == 0: + log.error(f"Failed to open device {dt}:{di} !") + else: + log.info(f"Open device {dt}:{di} successfully!") + return ret != 0 + + +def vci_device_close(dt, di) -> bool: + ret = lib.VCI_CloseDevice(c_uint32(dt), c_uint32(di)) + if ret == 0: + log.error(f"Failed to open device {dt}:{di}") + else: + log.info(f"Open device {dt}:{di} successfully") + return ret != 0 + + +def vci_channel_open( + dt, di, ch, clock, atiming, dtiming=None, mode=ZCAN_MODE.NORMAL +) -> bool: + init = ZCANFD_INIT() + init.mode = mode.value + init.clk = clock + init.atim = atiming + init.dtim = dtiming or atiming + ret = lib.VCI_InitCAN(c_uint32(dt), c_uint32(di), c_uint32(ch), byref(init)) + if ret == 0: + log.error(f"CH{ch}: Initialize failed") + else: + log.info(f"CH{ch}: Initialized") + ret = lib.VCI_StartCAN(c_uint32(dt), c_uint32(di), c_uint32(ch)) + if ret == 0: + log.error(f"CH{ch}: Start failed") + else: + log.info(f"CH{ch}: Started") + return ret != 0 + + +def vci_channel_close(dt, di, ch) -> bool: + ret = lib.VCI_ResetCAN(c_uint32(dt), c_uint32(di), c_uint32(ch)) + if ret == 0: + log.error(f"CH{ch}: Close failed") + else: + log.info(f"CH{ch}: Closed") + return ret != 0 + + +def vci_channel_read_info(dt, di, ch, info) -> bool: + ret = lib.VCI_ReadErrInfo(c_uint32(dt), c_uint32(di), c_uint32(ch), byref(info)) + if ret == 0: + log.error(f"CH{ch}: Failed to read error infomation") + else: + log.info(f"CH{ch}: Read error infomation successfully") + return ret != 0 + + +def vci_channel_enable_tres(dt, di, ch, value) -> bool: + ref = ZCAN_REF.TRES.value + tres = ZCAN_TRES(enable=ZCAN_TRES.ON if value else ZCAN_TRES.OFF) + ret = lib.VCI_SetReference( + c_uint32(dt), c_uint32(di), c_uint32(ch), c_uint32(ref), byref(tres) + ) + if ret == 0: + log.error( + f'CH{ch}: Failed to {"enable" if value else "disable"} ' + f"termination resistor" + ) + else: + log.info( + f'CH{ch}: {"Enable" if value else "Disable"} ' + f"termination resistor successfully" + ) + return ret != 0 + + +def vci_channel_set_tx_timeout(dt, di, ch, value) -> bool: + ref = ZCAN_REF.TX_TIMEOUT.value + timeout = ZCAN_TX_TIMEOUT(value=value) + ret = lib.VCI_SetReference( + c_uint32(dt), c_uint32(di), c_uint32(ch), c_uint32(ref), byref(timeout) + ) + if ret == 0: + log.error(f"CH{ch}: Failed to set tx timout {value}ms!") + else: + log.debug(f"CH{ch}: Set tx timout to {value}ms") + return ret != 0 + + +def vci_can_send(dt, di, ch, msgs, count) -> int: + ret = lib.VCI_Transmit( + c_uint32(dt), c_uint32(di), c_uint32(ch), byref(msgs), c_uint32(count) + ) + if ret == 0: + log.error(f"CH{ch}: Failed to send CAN message(s)") + else: + log.debug(f"CH{ch}: Sent {ret} CAN message(s)") + return ret + + +def vci_canfd_send(dt, di, ch, msgs, count) -> int: + ret = lib.VCI_TransmitFD( + c_uint32(dt), c_uint32(di), c_uint32(ch), byref(msgs), c_uint32(count) + ) + if ret == 0: + log.error(f"CH{ch}: Failed to send CAN-FD message(s)") + else: + log.debug(f"CH{ch}: Sent {ret} CAN-FD message(s)") + return ret + + +def vci_can_recv(dt, di, ch, msgs, count, delay) -> int: + ret = lib.VCI_Receive( + c_uint32(dt), + c_uint32(di), + c_uint32(ch), + byref(msgs), + c_uint32(count), + c_uint32(delay), + ) + log.debug(f"CH{ch}: Received {ret} CAN message(s)") + return ret + + +def vci_canfd_recv(dt, di, ch, msgs, count, delay) -> int: + ret = lib.VCI_ReceiveFD( + c_uint32(dt), + c_uint32(di), + c_uint32(ch), + byref(msgs), + c_uint32(count), + c_uint32(delay), + ) + log.debug(f"CH{ch}: Received {ret} CAN-FD message(s)") + return ret + + +def vci_can_get_recv_num(dt, di, ch) -> int: + ret = lib.VCI_GetReceiveNum(c_uint32(dt), c_uint32(di), c_uint32(ch)) + log.debug(f"CH{ch}: {ret} message(s) in CAN FIFO") + return ret + + +def vci_canfd_get_recv_num(dt, di, ch) -> int: + ch = ch | 0x8000_0000 + ret = lib.VCI_GetReceiveNum(c_uint32(dt), c_uint32(di), c_uint32(ch)) + log.debug(f"CH{ch&0x7FFF_FFFF}: {ret} message(s) in CAN-FD FIFO") + return ret diff --git a/doc/configuration.rst b/doc/configuration.rst index 7b42017a9..5ff465027 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -156,6 +156,8 @@ Lookup table of interface names: +---------------------+-------------------------------------+ | ``"vector"`` | :doc:`interfaces/vector` | +---------------------+-------------------------------------+ +| ``"zlg"`` | :doc:`interfaces/zlg` | ++---------------------+-------------------------------------+ | ``"virtual"`` | :doc:`interfaces/virtual` | +---------------------+-------------------------------------+ diff --git a/doc/interfaces.rst b/doc/interfaces.rst index 6645ec338..6bfaa4b98 100644 --- a/doc/interfaces.rst +++ b/doc/interfaces.rst @@ -38,6 +38,7 @@ The following hardware interfaces are included in python-can: interfaces/systec interfaces/usb2can interfaces/vector + interfaces/zlg Additional interface types can be added via the :ref:`plugin interface`, or by installing a third party package that utilises the :ref:`plugin interface`. diff --git a/doc/interfaces/zlg.rst b/doc/interfaces/zlg.rst new file mode 100644 index 000000000..d1abfde88 --- /dev/null +++ b/doc/interfaces/zlg.rst @@ -0,0 +1,91 @@ +.. _zlg: + +ZLG USB CAN +=========== + + +About +----- +Unofficial ZLG USBCAN/USBCANFD implementation for Linux + +Author: Keelung.Yang@flex.com + + +Supported devices +----------------- +Tested on USBCANFD-200U (https://www.zlg.cn/can/can/product/id/223.html) + +For other ZLG CAN devices, you can change ``bus.DeviceType`` +to another value defined ``vci.ZCAN_DEVICE`` + + +How to add new ZLG device +------------------------- +1. Add a new device type in ``vci.ZCAN_DEVICE`` +2. Update ``ZlgBitTiming.speeds`` by device type in ``timming`` module +3. Update version infomation in ``zlg/__init__.py`` + + +CAN filters +----------- +ZLG CAN Linux driver only supports filtering by ranges of CAN ID currently. +But the MCU(NXP LPC54616, in USBCANFD-200U) supports filtering by ranges and masks. +But they dropped mask filters and no plan to enable this feature again. +So hardware filtering is not supported in this interface currently. + +Also, it is possible to convert limited masks into ranges asked by official driver. +But the count of filters is limited to 64 in official driver. +Although 128 standard ID filters and 64 extended ID filters supported in MCU. + + +Tx/Rx timeout +------------- +You've to set tx timeout by VCI_SetReference(), since no timeout argument in +VCI_Transmit|FD(). And the maximum tx timeout is 4000ms, +limited by firmware, as explained officially. + +And rx timeout is just delay, not aware of coming data in VCI_Receive|FD() API. +E.g. if rx_timeout is 2s and received data in 1s, you got data after 2s. + +In this interface implementation, to avoid above Tx/Rx limitations, +both tx and rx timeout is implemented by software. +So you can set timeout to ``None`` in send() or recv() to wait indefinitely. +Or other values to wait for an actual timeout. + + +Note +---- +1. Before using this interface + + a. Install ``libusb`` package in your Linux distribution + b. Download ``libusbcanfd.so`` from Linux driver package + + For more infomation, please check: + https://manual.zlg.cn/web/#/188/6978 + +2. There are three official libraries which are incompatible with each other: + + a. VCI_*Functions & ZCAN_*Structures for Linux (CAN & CANFD) + b. VCI_*Functions & VCI_*Structures for Windows (CAN only) + c. ZCAN_*Functions & ZCAN_*Structures for Windows (CAN and CANFD) + + For more infomation, please check: + USBCANFD for Linux, https://manual.zlg.cn/web/#/188/6982 + + USBCANFD for Windows, https://www.zlg.cn/data/upload/software/Can/CAN_lib.rar + + USBCANFD resource, https://www.zlg.cn/can/down/down/id/223.html + +3. The Device Type IDs are different between Linux and Windows + + Linux: https://manual.zlg.cn/web/#/188/6984 + + Windows: https://manual.zlg.cn/web/#/152/6361 + +4. You can't detect ``BUS_OFF`` by Linux APIs currently, since there is + no such state in ``ZCAN_MSG_INF`` structure and the firmware will + restart automatically when ``BUS_OFF`` happened. + +5. There is also an official python-can interface implementation for Windows only. + + But declared no sustaining anymore: https://manual.zlg.cn/web/#/169/6070 diff --git a/test/test_interface_zlg.py b/test/test_interface_zlg.py new file mode 100644 index 000000000..89a9823ff --- /dev/null +++ b/test/test_interface_zlg.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +import unittest +import time +import can + +from can.exceptions import * + + +class ZlgTestCase(unittest.TestCase): + def setUp(self): + self.channel = 0 + self.device = 0 + self.std_id = 0x123 + self.ext_id = 0x123456 + self.bitrate = 500_000 + self.data_bitrate = 2_000_000 + self.bus = can.Bus( + bustype="zlg", + channel=self.channel, + device=self.device, + bitrate=self.bitrate, + data_bitrate=self.data_bitrate, + ) + self.can_std_msg = can.Message( + arbitration_id=self.std_id, + is_extended_id=False, + is_fd=False, + data=list(range(8)), + ) + self.can_ext_msg = can.Message( + arbitration_id=self.ext_id, + is_extended_id=True, + is_fd=False, + data=list(range(8)), + ) + self.canfd_std_msg = can.Message( + arbitration_id=self.std_id, + is_extended_id=False, + is_fd=True, + data=list(range(64)), + ) + self.canfd_ext_msg = can.Message( + arbitration_id=self.ext_id, + is_extended_id=True, + is_fd=True, + data=list(range(64)), + ) + + def tearDown(self): + self.bus.shutdown() + + def testSendStdMsg(self): + try: + self.bus.send(self.can_std_msg) + except CanOperationError as ex: + self.fail("Failed to send CAN std frame") + + def testSendExtMsg(self): + try: + self.bus.send(self.can_ext_msg) + except CanOperationError as ex: + self.fail("Failed to send CAN ext frame") + + def testSendStdMsgFD(self): + try: + self.bus.send(self.canfd_std_msg) + except CanOperationError as ex: + self.fail("Failed to send CANFD std frame") + + def testSendExtMsgFD(self): + try: + self.bus.send(self.canfd_ext_msg) + except CanOperationError as ex: + self.fail("Failed to send CANFD ext frame") + + def testSendTimeout0S(self): + try: + timeout = 0 + print(f"Set send {timeout=}") + t1 = time.time() + self.bus.send(self.can_std_msg, timeout) + t2 = time.time() + self.assertAlmostEqual(t1, t2, delta=10) + except Exception as ex: + self.fail(str(ex)) + + def testSendTimeout1S(self): + try: + timeout = 1 + print(f"Set send {timeout=}") + t1 = time.time() + self.bus.send(self.can_std_msg, timeout) + t2 = time.time() + self.assertAlmostEqual(t1, t2, delta=timeout * 1.1) + except Exception as ex: + self.fail(str(ex)) + + def testSendTimeout5S(self): + try: + timeout = 5 + print(f"Set send {timeout=}") + t1 = time.time() + self.bus.send(self.can_std_msg, timeout) + t2 = time.time() + self.assertAlmostEqual(t1, t2, delta=timeout * 1.1) + except Exception as ex: + self.fail(str(ex)) + + def testSendTimeoutForever(self): + try: + timeout = None + print(f"Set send {timeout=}") + self.bus.send(self.can_std_msg, timeout) + except Exception as ex: + self.fail(str(ex)) + + def testRecv(self): + msg = self.bus.recv(0) + self.assertIsNotNone(msg) + + def testRecvTimeout0S(self): + try: + timeout = 0 + print(f"Set receive {timeout=}") + t1 = time.time() + msg = self.bus.recv(timeout) + t2 = time.time() + self.assertAlmostEqual(t1, t2, delta=10) + except Exception as ex: + self.fail(str(ex)) + + def testRecvTimeout1S(self): + try: + timeout = 1 + print(f"Set receive {timeout=}") + t1 = time.time() + msg = self.bus.recv(timeout) + t2 = time.time() + self.assertAlmostEqual(t1, t2, delta=timeout * 1.1) + except Exception as ex: + self.fail(str(ex)) + + def testRecvTimeout5S(self): + try: + timeout = 5 + print(f"Set receive {timeout=}") + t1 = time.time() + msg = self.bus.recv(timeout) + t2 = time.time() + self.assertAlmostEqual(t1, t2, delta=timeout * 1.1) + except Exception as ex: + self.fail(str(ex)) + + def testRecvTimeoutForever(self): + try: + timeout = None + print(f"Set receive {timeout=}") + msg = self.bus.recv(timeout) + except Exception as ex: + self.fail(str(ex)) + + +if __name__ == "__main__": + unittest.main()