diff --git a/README.md b/README.md index e17e968..73c2c6b 100644 --- a/README.md +++ b/README.md @@ -1017,9 +1017,24 @@ ez-reset has definitions for (and thus *theoretically* supports) the following p ## Requirements -- Windows +- Windows or macOS - Supported Epson printer connected via USB +### macOS support + +The app can run on macOS using USB Printer Class via `pyusb`, but you must install `libusb` first: + +```bash +brew install libusb +``` + +Then install and run as usual: + +```bash +pip install . +python -m ez_reset +``` + ## Installation (easy) Grab a prebuilt binary from the *Releases* tab or [download directly from here](https://github.com/CiRIP/ez-reset/releases/latest/download/ez-reset.exe). diff --git a/pyproject.toml b/pyproject.toml index 88fc469..f12e8f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,8 @@ description = "Easy reset tool for Windows over USB for printers with a certain readme = "README.md" requires-python = ">=3.12" dependencies = [ - "pywin32>=311", + "pywin32>=311; platform_system == 'Windows'", + "pyusb>=1.3.1; platform_system != 'Windows'", ] [tool.ruff] diff --git a/src/ez_reset/__main__.py b/src/ez_reset/__main__.py index 8056af3..14184e1 100644 --- a/src/ez_reset/__main__.py +++ b/src/ez_reset/__main__.py @@ -1,4 +1,5 @@ import logging +import platform import tkinter as tk import traceback from collections.abc import Iterable @@ -9,7 +10,24 @@ from ez_reset.printer import Printer from ez_reset.status import InkColor, InkLevel from ez_reset.utils import parse_identifier -from ez_reset.win_usbprint import USBPRINTTransport, enumerate_printers + +if platform.system() == "Windows": + from ez_reset.win_usbprint import USBPRINTTransport, enumerate_printers + + USB_BACKEND_AVAILABLE = True + USB_BACKEND_MESSAGE = "" +else: + try: + from ez_reset.usbprint import USBPRINTTransport, enumerate_printers + except ImportError: + USB_BACKEND_AVAILABLE = False + USB_BACKEND_MESSAGE = "Install non-Windows USB dependencies (pyusb + libusb) to use this app." + + def enumerate_printers() -> Iterable[str]: + return () + else: + USB_BACKEND_AVAILABLE = True + USB_BACKEND_MESSAGE = "" class PrinterList(ttk.Frame): @@ -22,6 +40,11 @@ def __init__(self, master: tk.Misc) -> None: self.list.pack(fill=tk.BOTH, expand=True) self.list.bind("", self.open_printer) + if not USB_BACKEND_AVAILABLE: + self.list.insert(0, USB_BACKEND_MESSAGE) + self.list.configure(state=tk.DISABLED) + return + self.update_printers() def update_printers(self) -> None: @@ -33,6 +56,10 @@ def update_printers(self) -> None: self.list.insert(idx, device) def open_printer(self, _event: tk.Event) -> None: + if not USB_BACKEND_AVAILABLE: + messagebox.showinfo("Unsupported platform", USB_BACKEND_MESSAGE) + return + selected_index = self.list.curselection()[0] usb_transport = USBPRINTTransport(self._printers[selected_index]).__enter__() @@ -197,7 +224,8 @@ def update_level(self, level: int) -> None: root = tk.Tk() root.title("ez-reset") style = ttk.Style(root) -style.theme_use("winnative") +if platform.system() == "Windows": + style.theme_use("winnative") def show_error(self, *args) -> None: # noqa: ANN001,ANN002,ARG001 diff --git a/src/ez_reset/usbprint/__init__.py b/src/ez_reset/usbprint/__init__.py new file mode 100644 index 0000000..bfc0c6c --- /dev/null +++ b/src/ez_reset/usbprint/__init__.py @@ -0,0 +1,2 @@ +from .transport import USBPRINTTransport as USBPRINTTransport +from .transport import enumerate_printers as enumerate_printers diff --git a/src/ez_reset/usbprint/transport.py b/src/ez_reset/usbprint/transport.py new file mode 100644 index 0000000..ec3740f --- /dev/null +++ b/src/ez_reset/usbprint/transport.py @@ -0,0 +1,191 @@ +import logging +import time +from types import TracebackType +from typing import Self + +import usb.core +import usb.util + +from ez_reset.transport import Transport + +logger = logging.getLogger(__name__) + +MAX_TRANSFER_SIZE = 0x400000 +READ_TIMEOUT_MS = 150 +WRITE_TIMEOUT_MS = 5000 + +PRINTER_CLASS = 0x07 +GET_DEVICE_ID = 0x00 +SOFT_RESET = 0x02 + + +def _is_printer_device(device: usb.core.Device) -> bool: + for config in device: + for interface in config: + if interface.bInterfaceClass == PRINTER_CLASS: + return True + + return False + + +def _find_printer_interface(device: usb.core.Device) -> tuple[int, int]: + for config in device: + for interface in config: + if interface.bInterfaceClass == PRINTER_CLASS: + return config.bConfigurationValue, interface.bInterfaceNumber + + msg = f"USB device {device.idVendor:04x}:{device.idProduct:04x} has no printer interface" + raise RuntimeError(msg) + + +def _format_path(device: usb.core.Device) -> str: + return f"{device.idVendor:04x}:{device.idProduct:04x}:{device.bus:03d}:{device.address:03d}" + + +def enumerate_printers() -> list[str]: + printers: list[str] = [] + devices = usb.core.find(find_all=True, custom_match=_is_printer_device) + + for device in devices: + path = _format_path(device) + product = usb.util.get_string(device, device.iProduct) or "Unknown printer" + printers.append(f"{path} {product}") + + return printers + + +class USBPRINTTransport(Transport): + def __init__(self, path: str) -> None: + self.path = path + self.closed = True + + self._buffer = b"" + self._device: usb.core.Device | None = None + self._interface: int | None = None + self._ep_in: usb.core.Endpoint | None = None + self._ep_out: usb.core.Endpoint | None = None + self._detached_kernel = False + + def __enter__(self) -> Self: + try: + vid, pid, bus, address = (int(v, 16) for v in self.path.split(" ", 1)[0].split(":")) + except (ValueError, IndexError) as exc: + msg = f"Invalid USB printer path: {self.path}" + raise ValueError(msg) from exc + + device = usb.core.find(idVendor=vid, idProduct=pid, bus=bus, address=address) + if not device: + msg = f"USB printer not found: {self.path}" + raise OSError(msg) + + cfg_value, interface = _find_printer_interface(device) + device.set_configuration(cfg_value) + + if device.is_kernel_driver_active(interface): + device.detach_kernel_driver(interface) + self._detached_kernel = True + + usb.util.claim_interface(device, interface) + configuration = device.get_active_configuration() + intf = configuration[(interface, 0)] + + ep_out = usb.util.find_descriptor( + intf, + custom_match=lambda endpoint: usb.util.endpoint_direction(endpoint.bEndpointAddress) + == usb.util.ENDPOINT_OUT + and usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK, + ) + ep_in = usb.util.find_descriptor( + intf, + custom_match=lambda endpoint: usb.util.endpoint_direction(endpoint.bEndpointAddress) + == usb.util.ENDPOINT_IN + and usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK, + ) + + if not ep_out or not ep_in: + msg = f"USB printer interface is missing bulk endpoints: {self.path}" + raise OSError(msg) + + # Clear stale state before opening a D4 session. + device.ctrl_transfer(0x21, SOFT_RESET, 0, interface) + + self._device = device + self._interface = interface + self._ep_in = ep_in + self._ep_out = ep_out + self.closed = False + + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool: + if self._device and self._interface is not None: + usb.util.release_interface(self._device, self._interface) + if self._detached_kernel: + self._device.attach_kernel_driver(self._interface) + + usb.util.dispose_resources(self._device) + + self._device = None + self._interface = None + self._ep_in = None + self._ep_out = None + self._detached_kernel = False + self.closed = True + + return False + + def write(self, data: bytes) -> None: + if self.closed or not self._ep_out: + msg = f"Handle to USBPRINT device {self.path} is closed" + raise OSError(msg) + + bytes_written = self._ep_out.write(data, timeout=WRITE_TIMEOUT_MS) + if bytes_written != len(data): + msg = f"Short USB write: expected {len(data)}, got {bytes_written}" + raise OSError(msg) + + def read(self, size: int) -> bytes: + if self.closed or not self._ep_in: + msg = f"Handle to USBPRINT device {self.path} is closed" + raise OSError(msg) + + while len(self._buffer) < size: + try: + data = bytes(self._ep_in.read(MAX_TRANSFER_SIZE, timeout=READ_TIMEOUT_MS)) + self._buffer += data + except usb.core.USBTimeoutError: + time.sleep(0.01) + + read = self._buffer[:size] + self._buffer = self._buffer[size:] + return read + + def drain(self) -> None: + if self.closed or not self._ep_in: + return + + while True: + try: + data = self._ep_in.read(MAX_TRANSFER_SIZE, timeout=READ_TIMEOUT_MS) + if len(data) == 0: + return + except usb.core.USBTimeoutError: + return + + def identify(self) -> str: + if self.closed or not self._device or self._interface is None: + msg = f"Handle to USBPRINT device {self.path} is closed" + raise OSError(msg) + + raw = bytes(self._device.ctrl_transfer(0xA1, GET_DEVICE_ID, 0, self._interface, 1024)) + if len(raw) < 3: + return raw.decode("ascii", errors="ignore") + + payload_size = int.from_bytes(raw[:2], byteorder="big") + payload = raw[2:payload_size] + return payload.decode("ascii", errors="ignore").strip("\x00")