diff --git a/.gitignore b/.gitignore index 62b798d599..b1e6747bbe 100644 --- a/.gitignore +++ b/.gitignore @@ -369,6 +369,9 @@ paket-files/ # Python Tools for Visual Studio (PTVS) __pycache__/ *.pyc +# Python virtual environments and tool caches (e.g. source/binding/Python) +.venv/ +.ruff_cache/ # Cake - Uncomment if you are using it # tools/** diff --git a/.vscode/settings.json b/.vscode/settings.json index 361ec5a38d..f8c39a363e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -21,7 +21,10 @@ "editor.defaultFormatter": "vscode.json-language-features" }, "[python]": { - "editor.defaultFormatter": "ms-python.black-formatter" + "editor.defaultFormatter": "charliermarsh.ruff", + "editor.codeActionsOnSave": { + "source.organizeImports": "explicit" + } }, "[yaml]": { "editor.insertSpaces": true, diff --git a/source/binding/Python/maa/__init__.py b/source/binding/Python/maa/__init__.py index e0adf72c52..f98d109a5c 100644 --- a/source/binding/Python/maa/__init__.py +++ b/source/binding/Python/maa/__init__.py @@ -4,9 +4,6 @@ from .library import Library env_path = os.environ.get("MAAFW_BINARY_PATH") -if env_path: - __PATH = Path(env_path) -else: - __PATH = Path(Path(__file__).parent, "bin") +path = Path(env_path) if env_path else Path(Path(__file__).parent, "bin") -Library.open(__PATH, agent_server=False) +Library.open(path, agent_server=False) diff --git a/source/binding/Python/maa/agent/__init__.py b/source/binding/Python/maa/agent/__init__.py index 5f15bc98c1..f688ccb043 100644 --- a/source/binding/Python/maa/agent/__init__.py +++ b/source/binding/Python/maa/agent/__init__.py @@ -4,9 +4,6 @@ from ..library import Library env_path = os.environ.get("MAAFW_BINARY_PATH") -if env_path: - __PATH = Path(env_path) -else: - __PATH = Path(Path(__file__).parent.parent, "bin") +path = Path(env_path) if env_path else Path(Path(__file__).parent.parent, "bin") -Library.open(__PATH, agent_server=True) +Library.open(path, agent_server=True) diff --git a/source/binding/Python/maa/agent/agent_server.py b/source/binding/Python/maa/agent/agent_server.py index 016e50a1f5..28c5c6fa43 100644 --- a/source/binding/Python/maa/agent/agent_server.py +++ b/source/binding/Python/maa/agent/agent_server.py @@ -1,8 +1,17 @@ import ctypes +from typing import TYPE_CHECKING, Callable from ..define import * -from ..library import Library from ..event_sink import EventSink +from ..library import Library + +if TYPE_CHECKING: + from ..context import ContextEventSink + from ..controller import ControllerEventSink + from ..custom_action import CustomAction + from ..custom_recognition import CustomRecognition + from ..resource import ResourceEventSink + from ..tasker import TaskerEventSink class AgentServer: @@ -24,29 +33,33 @@ def analyze(self, context, argv): """ @staticmethod - def custom_recognition(name: str): + def custom_recognition( + name: str, + ) -> Callable[[type["CustomRecognition"]], type["CustomRecognition"]]: """自定义识别器装饰器 / Custom recognition decorator Args: - name: 识别器名称,需与 Pipeline 中的 custom_recognition 字段匹配 / Recognition name, should match the custom_recognition field in Pipeline + name: 识别器名称,需与 Pipeline 中的 custom_recognition 字段匹配 + Recognition name, should match the custom_recognition field in Pipeline Returns: 装饰器函数 / Decorator function """ - def wrapper_recognition(recognition): - AgentServer.register_custom_recognition( - name=name, recognition=recognition() - ) + def wrapper_recognition( + recognition: type["CustomRecognition"], + ) -> type["CustomRecognition"]: + AgentServer.register_custom_recognition(name=name, recognition=recognition()) return recognition return wrapper_recognition - _custom_recognition_holder = {} + _custom_recognition_holder: dict[str, "CustomRecognition"] = {} @staticmethod def register_custom_recognition( - name: str, recognition: "CustomRecognition" # type: ignore + name: str, + recognition: "CustomRecognition", ) -> bool: """注册自定义识别器 / Register custom recognition @@ -72,26 +85,29 @@ def register_custom_recognition( ) @staticmethod - def custom_action(name: str): + def custom_action( + name: str, + ) -> Callable[[type["CustomAction"]], type["CustomAction"]]: """自定义动作装饰器 / Custom action decorator Args: - name: 动作名称,需与 Pipeline 中的 custom_action 字段匹配 / Action name, should match the custom_action field in Pipeline + name: 动作名称,需与 Pipeline 中的 custom_action 字段匹配 + Action name, should match the custom_action field in Pipeline Returns: 装饰器函数 / Decorator function """ - def wrapper_action(action): + def wrapper_action(action: type["CustomAction"]) -> type["CustomAction"]: AgentServer.register_custom_action(name=name, action=action()) return action return wrapper_action - _custom_action_holder = {} + _custom_action_holder: dict[str, "CustomAction"] = {} @staticmethod - def register_custom_action(name: str, action: "CustomAction") -> bool: # type: ignore + def register_custom_action(name: str, action: "CustomAction") -> bool: """注册自定义动作 / Register custom action Args: @@ -120,7 +136,8 @@ def start_up(identifier: str) -> bool: """启动 Agent 服务 / Start Agent service Args: - identifier: 连接标识符,用于与 AgentClient 匹配 / Connection identifier for matching with AgentClient + identifier: 连接标识符,用于与 AgentClient 匹配 + Connection identifier for matching with AgentClient Returns: bool: 是否成功 / Whether successful @@ -162,17 +179,19 @@ def detach() -> None: Library.agent_server().MaaAgentServerDetach() - _sink_holder: Dict[int, "EventSink"] = {} + _sink_holder: dict[int, "EventSink"] = {} @staticmethod - def resource_sink(): + def resource_sink() -> Callable[[type["ResourceEventSink"]], type["ResourceEventSink"]]: """资源事件监听器装饰器 / Resource event sink decorator Returns: 装饰器函数 / Decorator function """ - def wrapper_sink(sink): + def wrapper_sink( + sink: type["ResourceEventSink"], + ) -> type["ResourceEventSink"]: AgentServer.add_resource_sink(sink=sink()) return sink @@ -185,25 +204,23 @@ def add_resource_sink(sink: "ResourceEventSink") -> None: Args: sink: 资源事件监听器 / Resource event sink """ - sink_id = int( - Library.agent_server().MaaAgentServerAddResourceSink( - *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.agent_server().MaaAgentServerAddResourceSink(*EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None AgentServer._sink_holder[sink_id] = sink @staticmethod - def controller_sink(): + def controller_sink() -> Callable[[type["ControllerEventSink"]], type["ControllerEventSink"]]: """控制器事件监听器装饰器 / Controller event sink decorator Returns: 装饰器函数 / Decorator function """ - def wrapper_sink(sink): + def wrapper_sink( + sink: type["ControllerEventSink"], + ) -> type["ControllerEventSink"]: AgentServer.add_controller_sink(sink=sink()) return sink @@ -216,25 +233,21 @@ def add_controller_sink(sink: "ControllerEventSink") -> None: Args: sink: 控制器事件监听器 / Controller event sink """ - sink_id = int( - Library.agent_server().MaaAgentServerAddControllerSink( - *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.agent_server().MaaAgentServerAddControllerSink(*EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None AgentServer._sink_holder[sink_id] = sink @staticmethod - def tasker_sink(): + def tasker_sink() -> Callable[[type["TaskerEventSink"]], type["TaskerEventSink"]]: """任务器事件监听器装饰器 / Tasker event sink decorator Returns: 装饰器函数 / Decorator function """ - def wrapper_sink(sink): + def wrapper_sink(sink: type["TaskerEventSink"]) -> type["TaskerEventSink"]: AgentServer.add_tasker_sink(sink=sink()) return sink @@ -247,25 +260,21 @@ def add_tasker_sink(sink: "TaskerEventSink") -> None: Args: sink: 任务器事件监听器 / Tasker event sink """ - sink_id = int( - Library.agent_server().MaaAgentServerAddTaskerSink( - *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.agent_server().MaaAgentServerAddTaskerSink(*EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None AgentServer._sink_holder[sink_id] = sink @staticmethod - def context_sink(): + def context_sink() -> Callable[[type["ContextEventSink"]], type["ContextEventSink"]]: """上下文事件监听器装饰器 / Context event sink decorator Returns: 装饰器函数 / Decorator function """ - def wrapper_sink(sink): + def wrapper_sink(sink: type["ContextEventSink"]) -> type["ContextEventSink"]: AgentServer.add_context_sink(sink=sink()) return sink @@ -278,11 +287,7 @@ def add_context_sink(sink: "ContextEventSink") -> None: Args: sink: 上下文事件监听器 / Context event sink """ - sink_id = int( - Library.agent_server().MaaAgentServerAddContextSink( - *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.agent_server().MaaAgentServerAddContextSink(*EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None diff --git a/source/binding/Python/maa/agent_client.py b/source/binding/Python/maa/agent_client.py index 425bd04bac..fbbbd9c37d 100644 --- a/source/binding/Python/maa/agent_client.py +++ b/source/binding/Python/maa/agent_client.py @@ -1,12 +1,12 @@ import ctypes -from typing import List +from typing import Optional +from .buffer import StringBuffer, StringListBuffer +from .controller import Controller from .define import * from .library import Library from .resource import Resource -from .controller import Controller from .tasker import Tasker -from .buffer import StringBuffer, StringListBuffer class AgentClient: @@ -14,13 +14,19 @@ class AgentClient: 用于连接到 AgentServer,将自定义识别器和动作的执行委托给独立进程。 这允许将 MaaFW 本体与 Custom 逻辑分离至独立进程运行。 - Used to connect to AgentServer, delegating custom recognition and action execution to a separate process. + Used to connect to AgentServer, delegating custom recognition and action execution to a separate + process. This allows separating MaaFW core from Custom logic into independent processes. """ _handle: MaaAgentClientHandle - def __init__(self, identifier: Optional[str] = None, *, _handle: MaaAgentClientHandle = None): + def __init__( + self, + identifier: Optional[str] = None, + *, + _handle: Optional[MaaAgentClientHandle] = None, + ): """创建 Agent 客户端 / Create Agent client 默认使用 IPC 模式;如果 identifier 为纯数字字符串,则会将其视为 TCP 端口号并监听 127.0.0.1。 @@ -60,7 +66,8 @@ def create_tcp(cls, port: int = 0) -> "AgentClient": 客户端会监听 127.0.0.1 上的指定端口。如果传入 0 则自动选择可用端口。 AgentServer 端使用 identifier 属性获取的端口号作为 identifier 即可通过 TCP 连接。 - The client listens on 127.0.0.1 at the specified port. If 0 is passed, an available port is automatically selected. + The client listens on 127.0.0.1 at the specified port. If 0 is passed, an available port is + automatically selected. AgentServer can use the port number from the identifier property to connect via TCP. Args: @@ -74,9 +81,7 @@ def create_tcp(cls, port: int = 0) -> "AgentClient": ValueError: 如果端口号无效 """ if not (0 <= port <= 65535): - raise ValueError( - f"Invalid port number: {port}. Must be between 0 and 65535." - ) + raise ValueError(f"Invalid port number: {port}. Must be between 0 and 65535.") cls._set_api_properties() @@ -98,9 +103,7 @@ def identifier(self) -> Optional[str]: Optional[str]: 连接标识符,如果未设置则返回 None / Connection identifier, or None if not set """ id_buffer = StringBuffer() - if not Library.agent_client().MaaAgentClientIdentifier( - self._handle, id_buffer._handle - ): + if not Library.agent_client().MaaAgentClientIdentifier(self._handle, id_buffer._handle): return None return id_buffer.get() @@ -120,15 +123,9 @@ def bind(self, resource: Resource) -> bool: # avoid gc self._resource = resource - return bool( - Library.agent_client().MaaAgentClientBindResource( - self._handle, resource._handle - ) - ) + return bool(Library.agent_client().MaaAgentClientBindResource(self._handle, resource._handle)) - def register_sink( - self, resource: Resource, controller: Controller, tasker: Tasker - ) -> bool: + def register_sink(self, resource: Resource, controller: Controller, tasker: Tasker) -> bool: """注册事件监听器 / Register event sinks 将资源、控制器、任务器的事件转发给 AgentServer。 @@ -146,21 +143,9 @@ def register_sink( self._sinks = [resource, controller, tasker] return ( - bool( - Library.agent_client().MaaAgentClientRegisterResourceSink( - self._handle, resource._handle - ) - ) - and bool( - Library.agent_client().MaaAgentClientRegisterControllerSink( - self._handle, controller._handle - ) - ) - and bool( - Library.agent_client().MaaAgentClientRegisterTaskerSink( - self._handle, tasker._handle - ) - ) + bool(Library.agent_client().MaaAgentClientRegisterResourceSink(self._handle, resource._handle)) + and bool(Library.agent_client().MaaAgentClientRegisterControllerSink(self._handle, controller._handle)) + and bool(Library.agent_client().MaaAgentClientRegisterTaskerSink(self._handle, tasker._handle)) ) def connect(self) -> bool: @@ -206,14 +191,10 @@ def set_timeout(self, milliseconds: int) -> bool: Returns: bool: 是否成功 / Whether successful """ - return bool( - Library.agent_client().MaaAgentClientSetTimeout( - self._handle, ctypes.c_int64(milliseconds) - ) - ) + return bool(Library.agent_client().MaaAgentClientSetTimeout(self._handle, ctypes.c_int64(milliseconds))) @property - def custom_recognition_list(self) -> List[str]: + def custom_recognition_list(self) -> list[str]: """获取已注册的自定义识别器列表 / Get registered custom recognizer list Returns: @@ -223,14 +204,12 @@ def custom_recognition_list(self) -> List[str]: RuntimeError: 如果获取失败 """ buffer = StringListBuffer() - if not Library.agent_client().MaaAgentClientGetCustomRecognitionList( - self._handle, buffer._handle - ): + if not Library.agent_client().MaaAgentClientGetCustomRecognitionList(self._handle, buffer._handle): raise RuntimeError("Failed to get custom recognition list.") return buffer.get() @property - def custom_action_list(self) -> List[str]: + def custom_action_list(self) -> list[str]: """获取已注册的自定义操作列表 / Get registered custom action list Returns: @@ -240,9 +219,7 @@ def custom_action_list(self) -> List[str]: RuntimeError: 如果获取失败 """ buffer = StringListBuffer() - if not Library.agent_client().MaaAgentClientGetCustomActionList( - self._handle, buffer._handle - ): + if not Library.agent_client().MaaAgentClientGetCustomActionList(self._handle, buffer._handle): raise RuntimeError("Failed to get custom action list.") return buffer.get() diff --git a/source/binding/Python/maa/buffer.py b/source/binding/Python/maa/buffer.py index 414e098e59..27405688db 100644 --- a/source/binding/Python/maa/buffer.py +++ b/source/binding/Python/maa/buffer.py @@ -1,6 +1,6 @@ import copy import ctypes -from typing import List, Optional, Union +from typing import Optional, Union import numpy @@ -56,9 +56,7 @@ def set(self, value: Union[str, bytes]) -> bool: """ if isinstance(value, str): value = value.encode() - return bool( - Library.framework().MaaStringBufferSetEx(self._handle, value, len(value)) - ) + return bool(Library.framework().MaaStringBufferSetEx(self._handle, value, len(value))) @property def empty(self) -> bool: @@ -144,21 +142,21 @@ def __del__(self): if self._handle and self._own: Library.framework().MaaStringListBufferDestroy(self._handle) - def get(self) -> List[str]: + def get(self) -> list[str]: """获取字符串列表 / Get string list Returns: List[str]: 字符串列表 / String list """ count = Library.framework().MaaStringListBufferSize(self._handle) - result = [] + result: list[str] = [] for i in range(count): buff = Library.framework().MaaStringListBufferAt(self._handle, i) s = StringBuffer(buff).get() result.append(s) return result - def set(self, value: List[str]) -> bool: + def set(self, value: list[str]) -> bool: """设置字符串列表 / Set string list Args: @@ -184,9 +182,7 @@ def append(self, value: str) -> bool: """ buff = StringBuffer() buff.set(value) - return bool( - Library.framework().MaaStringListBufferAppend(self._handle, buff._handle) - ) + return bool(Library.framework().MaaStringListBufferAppend(self._handle, buff._handle)) def remove(self, index: int) -> bool: """移除指定索引的字符串 / Remove string at index @@ -215,30 +211,20 @@ def _set_api_properties(): return StringListBuffer._api_properties_initialized = True - Library.framework().MaaStringListBufferCreate.restype = ( - MaaStringListBufferHandle - ) + Library.framework().MaaStringListBufferCreate.restype = MaaStringListBufferHandle Library.framework().MaaStringListBufferCreate.argtypes = [] Library.framework().MaaStringListBufferDestroy.restype = None - Library.framework().MaaStringListBufferDestroy.argtypes = [ - MaaStringListBufferHandle - ] + Library.framework().MaaStringListBufferDestroy.argtypes = [MaaStringListBufferHandle] Library.framework().MaaStringListBufferIsEmpty.restype = MaaBool - Library.framework().MaaStringListBufferIsEmpty.argtypes = [ - MaaStringListBufferHandle - ] + Library.framework().MaaStringListBufferIsEmpty.argtypes = [MaaStringListBufferHandle] Library.framework().MaaStringListBufferClear.restype = MaaBool - Library.framework().MaaStringListBufferClear.argtypes = [ - MaaStringListBufferHandle - ] + Library.framework().MaaStringListBufferClear.argtypes = [MaaStringListBufferHandle] Library.framework().MaaStringListBufferSize.restype = MaaSize - Library.framework().MaaStringListBufferSize.argtypes = [ - MaaStringListBufferHandle - ] + Library.framework().MaaStringListBufferSize.argtypes = [MaaStringListBufferHandle] Library.framework().MaaStringListBufferAt.restype = MaaStringBufferHandle Library.framework().MaaStringListBufferAt.argtypes = [ @@ -290,7 +276,8 @@ def get(self) -> numpy.ndarray: """获取图像数据 / Get image data Returns: - numpy.ndarray: BGR 格式图像,形状为 (height, width, channels) / BGR format image with shape (height, width, channels) + numpy.ndarray: BGR 格式图像,形状为 (height, width, channels) + BGR format image with shape (height, width, channels) """ buff = Library.framework().MaaImageBufferGetRawData(self._handle) if not buff: @@ -300,16 +287,15 @@ def get(self) -> numpy.ndarray: h = Library.framework().MaaImageBufferHeight(self._handle) c = Library.framework().MaaImageBufferChannels(self._handle) return copy.deepcopy( - numpy.ctypeslib.as_array( - ctypes.cast(buff, ctypes.POINTER(ctypes.c_uint8)), shape=(h, w, c) - ) + numpy.ctypeslib.as_array(ctypes.cast(buff, ctypes.POINTER(ctypes.c_uint8)), shape=(h, w, c)) ) def set(self, value: numpy.ndarray) -> bool: """设置图像数据 / Set image data Args: - value: BGR 格式图像,形状为 (height, width, channels) / BGR format image with shape (height, width, channels) + value: BGR 格式图像,形状为 (height, width, channels) + BGR format image with shape (height, width, channels) Returns: bool: 是否成功 / Whether successful @@ -317,11 +303,11 @@ def set(self, value: numpy.ndarray) -> bool: Raises: TypeError: 如果 value 不是 numpy.ndarray """ - if not isinstance(value, numpy.ndarray): + if not isinstance(value, numpy.ndarray): # pyright: ignore[reportUnnecessaryIsInstance] raise TypeError("value must be a numpy.ndarray") # 确保数组是 C-contiguous 的,避免切片视图导致的内存不连续问题 - if not value.flags['C_CONTIGUOUS']: + if not value.flags["C_CONTIGUOUS"]: value = numpy.ascontiguousarray(value) return bool( @@ -344,9 +330,7 @@ def resize(self, width: int = 0, height: int = 0) -> bool: Returns: bool: 是否成功 / Whether successful """ - return bool( - Library.framework().MaaImageBufferResize(self._handle, width, height) - ) + return bool(Library.framework().MaaImageBufferResize(self._handle, width, height)) @property def empty(self) -> bool: @@ -444,21 +428,21 @@ def __del__(self): if self._handle and self._own: Library.framework().MaaImageListBufferDestroy(self._handle) - def get(self) -> List[numpy.ndarray]: + def get(self) -> list[numpy.ndarray]: """获取图像列表 / Get image list Returns: List[numpy.ndarray]: 图像列表 / Image list """ count = Library.framework().MaaImageListBufferSize(self._handle) - result = [] + result: list[numpy.ndarray] = [] for i in range(count): buff = Library.framework().MaaImageListBufferAt(self._handle, i) img = ImageBuffer(buff).get() result.append(img) return result - def set(self, value: List[numpy.ndarray]) -> bool: + def set(self, value: list[numpy.ndarray]) -> bool: """设置图像列表 / Set image list Args: @@ -484,9 +468,7 @@ def append(self, value: numpy.ndarray) -> bool: """ buff = ImageBuffer() buff.set(value) - return bool( - Library.framework().MaaImageListBufferAppend(self._handle, buff._handle) - ) + return bool(Library.framework().MaaImageListBufferAppend(self._handle, buff._handle)) def remove(self, index: int) -> bool: """移除指定索引的图像 / Remove image at index @@ -519,19 +501,13 @@ def _set_api_properties(): Library.framework().MaaImageListBufferCreate.argtypes = [] Library.framework().MaaImageListBufferDestroy.restype = None - Library.framework().MaaImageListBufferDestroy.argtypes = [ - MaaImageListBufferHandle - ] + Library.framework().MaaImageListBufferDestroy.argtypes = [MaaImageListBufferHandle] Library.framework().MaaImageListBufferIsEmpty.restype = MaaBool - Library.framework().MaaImageListBufferIsEmpty.argtypes = [ - MaaImageListBufferHandle - ] + Library.framework().MaaImageListBufferIsEmpty.argtypes = [MaaImageListBufferHandle] Library.framework().MaaImageListBufferClear.restype = MaaBool - Library.framework().MaaImageListBufferClear.argtypes = [ - MaaImageListBufferHandle - ] + Library.framework().MaaImageListBufferClear.argtypes = [MaaImageListBufferHandle] Library.framework().MaaImageListBufferSize.restype = MaaSize Library.framework().MaaImageListBufferSize.argtypes = [MaaImageListBufferHandle] @@ -599,7 +575,8 @@ def set(self, value: RectType) -> bool: """设置矩形数据 / Set rectangle data Args: - value: 矩形数据,可以是 Rect、tuple、list 或 numpy.ndarray / Rectangle data, can be Rect, tuple, list, or numpy.ndarray + value: 矩形数据,可以是 Rect、tuple、list 或 numpy.ndarray + Rectangle data, can be Rect, tuple, list, or numpy.ndarray Returns: bool: 是否成功 / Whether successful @@ -608,7 +585,7 @@ def set(self, value: RectType) -> bool: ValueError: 如果数据格式不正确 TypeError: 如果类型不支持 """ - if isinstance(value, numpy.ndarray): + if isinstance(value, numpy.ndarray): # pyright: ignore[reportUnnecessaryIsInstance] if value.ndim != 1: raise ValueError("value must be a 1D array") if value.shape[0] != 4: @@ -619,16 +596,12 @@ def set(self, value: RectType) -> bool: if len(value) != 4: raise ValueError("value must have 4 elements") value = numpy.array(value, dtype=numpy.int32) - elif isinstance(value, Rect): + elif isinstance(value, Rect): # pyright: ignore[reportUnnecessaryIsInstance] pass else: raise TypeError("value must be a Rect, numpy.ndarray, tuple or list") - return bool( - Library.framework().MaaRectSet( - self._handle, value[0], value[1], value[2], value[3] - ) - ) + return bool(Library.framework().MaaRectSet(self._handle, value[0], value[1], value[2], value[3])) _api_properties_initialized: bool = False diff --git a/source/binding/Python/maa/context.py b/source/binding/Python/maa/context.py index 5e97596b69..53f00406f0 100644 --- a/source/binding/Python/maa/context.py +++ b/source/binding/Python/maa/context.py @@ -2,26 +2,26 @@ import dataclasses import json from dataclasses import dataclass -from typing import Any, Dict, Optional, Tuple +from typing import Any, Optional import numpy -from .event_sink import EventSink, NotificationType from .buffer import ImageBuffer, RectBuffer, StringBuffer, StringListBuffer from .define import * -from .library import Library -from .tasker import Tasker +from .event_sink import EventSink, NotificationType from .job import TaskJob +from .library import Library from .pipeline import ( + JActionParam, + JActionType, + JNodeAttr, JPipelineData, JPipelineParser, - JNodeAttr, - JRecognitionType, - JActionType, JRecognitionParam, - JActionParam, + JRecognitionType, JWaitFreezes, ) +from .tasker import Tasker class Context: @@ -42,9 +42,7 @@ def __init__(self, handle: MaaContextHandle): def __del__(self): pass - def run_task( - self, entry: str, pipeline_override: Dict = {} - ) -> Optional[TaskDetail]: + def run_task(self, entry: str, pipeline_override: Optional[dict[str, Any]] = None) -> Optional[TaskDetail]: """同步执行任务 / Synchronously execute task Args: @@ -54,10 +52,10 @@ def run_task( Returns: Optional[TaskDetail]: 任务详情,执行失败则返回 None / Task detail, or None if execution failed """ + if pipeline_override is None: + pipeline_override = {} task_id = int( - Library.framework().MaaContextRunTask( - self._handle, *Context._gen_post_param(entry, pipeline_override) - ) + Library.framework().MaaContextRunTask(self._handle, *Context._gen_post_param(entry, pipeline_override)) ) if not task_id: return None @@ -68,7 +66,7 @@ def run_recognition( self, entry: str, image: numpy.ndarray, - pipeline_override: Dict = {}, + pipeline_override: Optional[dict[str, Any]] = None, ) -> Optional[RecognitionDetail]: """同步执行识别逻辑 / Synchronously execute recognition logic @@ -82,18 +80,23 @@ def run_recognition( Returns: Optional[RecognitionDetail]: 识别结果。无论是否命中,只要尝试进行了识别,就会返回; - 请通过 RecognitionDetail.hit 判断是否命中。只在未能启动识别流程时(如 entry 不存在、node disabled、image 为空等),才可能返回 None。 + 请通过 RecognitionDetail.hit 判断是否命中。只在未能启动识别流程时(如 entry 不存在、node disabled、image + 为空等),才可能返回 + None。 Recognition detail. It always returns as long as recognition was attempted; - use RecognitionDetail.hit to determine hit. Only return None if the recognition process fails to start + use RecognitionDetail.hit to determine hit. Only return None if the recognition process + fails to start (e.g., entry does not exist, node is disabled, image is empty). """ + if pipeline_override is None: + pipeline_override = {} image_buffer = ImageBuffer() image_buffer.set(image) reco_id = int( Library.framework().MaaContextRunRecognition( self._handle, *Context._gen_post_param(entry, pipeline_override), - image_buffer._handle + image_buffer._handle, ) ) if not reco_id: @@ -106,7 +109,7 @@ def run_action( entry: str, box: RectType = (0, 0, 0, 0), reco_detail: str = "", - pipeline_override: Dict = {}, + pipeline_override: Optional[dict[str, Any]] = None, ) -> Optional[ActionDetail]: """同步执行操作逻辑 / Synchronously execute action logic @@ -121,11 +124,15 @@ def run_action( Returns: Optional[ActionDetail]: 操作结果。无论动作是否成功,只要尝试执行了动作,就会返回; - 请通过 ActionDetail.success 判断是否执行成功。只在未能启动动作流程时(如 entry 不存在、node disabled 等),才可能返回 None。 + 请通过 ActionDetail.success 判断是否执行成功。只在未能启动动作流程时(如 entry 不存在、node disabled + 等),才可能返回 None。 Action detail. It always returns as long as the action was attempted; - use ActionDetail.success to determine success. Only return None if the action flow fails to start + use ActionDetail.success to determine success. Only return None if the action flow fails + to start (e.g., entry does not exist, node is disabled, etc.). """ + if pipeline_override is None: + pipeline_override = {} rect = RectBuffer() rect.set(box) @@ -134,7 +141,7 @@ def run_action( self._handle, *Context._gen_post_param(entry, pipeline_override), rect._handle, - reco_detail.encode() + reco_detail.encode(), ) ) @@ -163,7 +170,8 @@ def run_recognition_direct( Optional[RecognitionDetail]: 识别结果。无论是否命中,只要尝试进行了识别,就会返回; 请通过 RecognitionDetail.hit 判断是否命中。只在未能启动识别流程时才可能返回 None。 Recognition detail. It always returns as long as recognition was attempted; - use RecognitionDetail.hit to determine hit. Only return None if the recognition process fails to start. + use RecognitionDetail.hit to determine hit. Only return None if the recognition process + fails to start. """ img_buffer = ImageBuffer() img_buffer.set(image) @@ -203,13 +211,12 @@ def run_action_direct( Optional[ActionDetail]: 操作结果。无论动作是否成功,只要尝试执行了动作,就会返回; 请通过 ActionDetail.success 判断是否执行成功。只在未能启动动作流程时才可能返回 None。 Action detail. It always returns as long as the action was attempted; - use ActionDetail.success to determine success. Only return None if the action flow fails to start. + use ActionDetail.success to determine success. Only return None if the action flow fails + to start. """ rect_buffer = RectBuffer() rect_buffer.set(box) - action_param_json = json.dumps( - dataclasses.asdict(action_param), ensure_ascii=False - ) + action_param_json = json.dumps(dataclasses.asdict(action_param), ensure_ascii=False) act_id = int( Library.framework().MaaContextRunActionDirect( self._handle, @@ -224,7 +231,7 @@ def run_action_direct( return self.tasker.get_action_detail(act_id) - def override_pipeline(self, pipeline_override: Dict) -> bool: + def override_pipeline(self, pipeline_override: dict[str, Any]) -> bool: """覆盖 pipeline / Override pipeline_override Args: @@ -242,7 +249,7 @@ def override_pipeline(self, pipeline_override: Dict) -> bool: ) ) - def override_next(self, name: str, next_list: List[str]) -> bool: + def override_next(self, name: str, next_list: list[str]) -> bool: """覆盖任务的 next 列表 / Override the next list of task 如果节点不存在,此方法会失败 @@ -258,11 +265,7 @@ def override_next(self, name: str, next_list: List[str]) -> bool: list_buffer = StringListBuffer() list_buffer.set(next_list) - return bool( - Library.framework().MaaContextOverrideNext( - self._handle, name.encode(), list_buffer._handle - ) - ) + return bool(Library.framework().MaaContextOverrideNext(self._handle, name.encode(), list_buffer._handle)) def override_image(self, image_name: str, image: numpy.ndarray) -> bool: """覆盖图片 / Override the image corresponding to image_name @@ -278,12 +281,10 @@ def override_image(self, image_name: str, image: numpy.ndarray) -> bool: image_buffer.set(image) return bool( - Library.framework().MaaContextOverrideImage( - self._handle, image_name.encode(), image_buffer._handle - ) + Library.framework().MaaContextOverrideImage(self._handle, image_name.encode(), image_buffer._handle) ) - def get_node_data(self, name: str) -> Optional[Dict]: + def get_node_data(self, name: str) -> Optional[dict[str, Any]]: """获取任务当前的定义 / Get the current definition of task Args: @@ -293,9 +294,7 @@ def get_node_data(self, name: str) -> Optional[Dict]: Optional[Dict]: 任务定义字典,如果不存在则返回 None / Task definition dict, or None if not exists """ string_buffer = StringBuffer() - if not Library.framework().MaaContextGetNodeData( - self._handle, name.encode(), string_buffer._handle - ): + if not Library.framework().MaaContextGetNodeData(self._handle, name.encode(), string_buffer._handle): return None data = string_buffer.get() @@ -314,7 +313,8 @@ def get_node_object(self, name: str) -> Optional[JPipelineData]: name: 任务名 / Task name Returns: - Optional[JPipelineData]: 任务定义对象,如果不存在则返回 None / Task definition object, or None if not exists + Optional[JPipelineData]: 任务定义对象,如果不存在则返回 None + Task definition object, or None if not exists """ node_data = self.get_node_data(name) @@ -372,11 +372,7 @@ def set_anchor(self, anchor_name: str, node_name: str) -> bool: Returns: bool: 是否成功 / Whether successful """ - return bool( - Library.framework().MaaContextSetAnchor( - self._handle, anchor_name.encode(), node_name.encode() - ) - ) + return bool(Library.framework().MaaContextSetAnchor(self._handle, anchor_name.encode(), node_name.encode())) def get_anchor(self, anchor_name: str) -> Optional[str]: """获取锚点对应的节点名 / Get node name for anchor @@ -388,9 +384,7 @@ def get_anchor(self, anchor_name: str) -> Optional[str]: Optional[str]: 节点名称,如果不存在则返回 None / Node name, or None if not exists """ string_buffer = StringBuffer() - if not Library.framework().MaaContextGetAnchor( - self._handle, anchor_name.encode(), string_buffer._handle - ): + if not Library.framework().MaaContextGetAnchor(self._handle, anchor_name.encode(), string_buffer._handle): return None return string_buffer.get() @@ -405,9 +399,7 @@ def get_hit_count(self, node_name: str) -> int: int: 命中计数 / Hit count """ count = ctypes.c_uint64() - if not Library.framework().MaaContextGetHitCount( - self._handle, node_name.encode(), ctypes.byref(count) - ): + if not Library.framework().MaaContextGetHitCount(self._handle, node_name.encode(), ctypes.byref(count)): return 0 return count.value @@ -420,31 +412,32 @@ def clear_hit_count(self, node_name: str) -> bool: Returns: bool: 是否成功 / Whether successful """ - return bool( - Library.framework().MaaContextClearHitCount( - self._handle, node_name.encode() - ) - ) + return bool(Library.framework().MaaContextClearHitCount(self._handle, node_name.encode())) def wait_freezes( self, time: int = 0, - box: Optional[Tuple[int, int, int, int]] = None, + box: Optional[tuple[int, int, int, int]] = None, wait_freezes_param: Optional[JWaitFreezes] = None, ) -> bool: """等待画面静止 / Wait for screen to stabilize (freeze) Args: time: 等待时间(毫秒) / Wait time in milliseconds - box: 识别命中的区域 (x, y, w, h),用于 target 为 Self 时计算 ROI / Recognition hit box, used when target is Self to calculate ROI - wait_freezes_param: 等待参数,使用 JWaitFreezes。支持 time, target, target_offset, threshold, method, rate_limit, timeout - / Wait parameters, use JWaitFreezes. Supports time, target, target_offset, threshold, method, rate_limit, timeout + box: 识别命中的区域 (x, y, w, h),用于 target 为 Self 时计算 ROI + Recognition hit box, used when target is Self to calculate ROI + wait_freezes_param: 等待参数,使用 JWaitFreezes。支持 time, target, target_offset, threshold, + method, rate_limit, timeout + + Wait parameters, use JWaitFreezes. Supports time, target, + target_offset, threshold, method, rate_limit, timeout Returns: bool: 是否成功 / Whether successful Note: - - time 和 wait_freezes_param.time 互斥,不能同时为非零或同时为零 / time and wait_freezes_param.time are mutually exclusive + - time 和 wait_freezes_param.time 互斥,不能同时为非零或同时为零 + time and wait_freezes_param.time are mutually exclusive """ rect_buffer = None if box: @@ -475,7 +468,7 @@ def _init_tasker(self): self._tasker = Tasker(handle=tasker_handle) @staticmethod - def _gen_post_param(entry: str, pipeline_override: Dict) -> Tuple[bytes, bytes]: + def _gen_post_param(entry: str, pipeline_override: dict[str, Any]) -> tuple[bytes, bytes]: pipeline_json = json.dumps(pipeline_override, ensure_ascii=False) return ( @@ -618,8 +611,8 @@ class NodeWaitFreezesDetail: wf_id: int name: str phase: str - roi: Tuple[int, int, int, int] - param: Dict[str, Any] + roi: tuple[int, int, int, int] + param: dict[str, Any] reco_ids: list[int] elapsed: Optional[int] focus: Any @@ -670,9 +663,7 @@ class NodeActionDetail: name: str focus: Any - def on_node_action( - self, context: Context, noti_type: NotificationType, detail: NodeActionDetail - ): + def on_node_action(self, context: Context, noti_type: NotificationType, detail: NodeActionDetail): pass @dataclass @@ -720,11 +711,10 @@ def on_node_action_node( ): pass - def on_raw_notification(self, context: Context, msg: str, details: dict): + def on_raw_notification(self, context: Context, msg: str, details: dict[str, Any]) -> None: pass - def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict): - + def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict[str, Any]) -> None: context = Context(handle=handle) self.on_raw_notification(context, msg, details) diff --git a/source/binding/Python/maa/controller.py b/source/binding/Python/maa/controller.py index a563984d00..4d228bd551 100644 --- a/source/binding/Python/maa/controller.py +++ b/source/binding/Python/maa/controller.py @@ -1,14 +1,18 @@ +import ctypes import json import os -import numpy from abc import abstractmethod +from collections.abc import Sequence from ctypes import c_int32 +from dataclasses import dataclass from pathlib import Path -from typing import Any, Dict, Optional, Sequence, Tuple, Union +from typing import Any, Optional, Union + +import numpy from .buffer import ImageBuffer, StringBuffer -from .event_sink import EventSink, NotificationType from .define import * +from .event_sink import EventSink, NotificationType from .job import Job, JobWithResult from .library import Library @@ -68,15 +72,15 @@ def post_click(self, x: int, y: int, contact: int = 0, pressure: int = 1) -> Job Args: x: x 坐标 / x coordinate y: y 坐标 / y coordinate - contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) / Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, 1:right, 2:middle) + contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) + Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, + 1:right, 2:middle) pressure: 触点力度 / Contact pressure Returns: Job: 作业对象,可通过 status/wait 查询状态 / Job object, can query status via status/wait """ - ctrl_id = Library.framework().MaaControllerPostClickV2( - self._handle, x, y, contact, pressure - ) + ctrl_id = Library.framework().MaaControllerPostClickV2(self._handle, x, y, contact, pressure) return self._gen_ctrl_job(ctrl_id) def post_swipe( @@ -97,7 +101,9 @@ def post_swipe( x2: 终点 x 坐标 / End x coordinate y2: 终点 y 坐标 / End y coordinate duration: 滑动时长(毫秒) / Swipe duration in milliseconds - contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) / Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, 1:right, 2:middle) + contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) + Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, + 1:right, 2:middle) pressure: 触点力度 / Contact pressure Returns: @@ -159,23 +165,20 @@ def post_input_text(self, text: str) -> Job: Returns: Job: 作业对象 / Job object """ - ctrl_id = Library.framework().MaaControllerPostInputText( - self._handle, text.encode() - ) + ctrl_id = Library.framework().MaaControllerPostInputText(self._handle, text.encode()) return self._gen_ctrl_job(ctrl_id) def post_start_app(self, intent: str) -> Job: """启动应用 / Start app Args: - intent: 目标应用 (Adb 控制器: package name 或 activity) / Target app (Adb controller: package name or activity) + intent: 目标应用 (Adb 控制器: package name 或 activity) + Target app (Adb controller: package name or activity) Returns: Job: 作业对象 / Job object """ - ctrl_id = Library.framework().MaaControllerPostStartApp( - self._handle, intent.encode() - ) + ctrl_id = Library.framework().MaaControllerPostStartApp(self._handle, intent.encode()) return self._gen_ctrl_job(ctrl_id) def post_stop_app(self, intent: str) -> Job: @@ -187,54 +190,50 @@ def post_stop_app(self, intent: str) -> Job: Returns: Job: 作业对象 / Job object """ - ctrl_id = Library.framework().MaaControllerPostStopApp( - self._handle, intent.encode() - ) + ctrl_id = Library.framework().MaaControllerPostStopApp(self._handle, intent.encode()) return self._gen_ctrl_job(ctrl_id) - def post_touch_down( - self, x: int, y: int, contact: int = 0, pressure: int = 1 - ) -> Job: + def post_touch_down(self, x: int, y: int, contact: int = 0, pressure: int = 1) -> Job: """按下 / Touch down Args: x: x 坐标 / x coordinate y: y 坐标 / y coordinate - contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) / Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, 1:right, 2:middle) + contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) + Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, + 1:right, 2:middle) pressure: 触点力度 / Contact pressure Returns: Job: 作业对象 / Job object """ - ctrl_id = Library.framework().MaaControllerPostTouchDown( - self._handle, contact, x, y, pressure - ) + ctrl_id = Library.framework().MaaControllerPostTouchDown(self._handle, contact, x, y, pressure) return self._gen_ctrl_job(ctrl_id) - def post_touch_move( - self, x: int, y: int, contact: int = 0, pressure: int = 1 - ) -> Job: + def post_touch_move(self, x: int, y: int, contact: int = 0, pressure: int = 1) -> Job: """移动 / Move Args: x: x 坐标 / x coordinate y: y 坐标 / y coordinate - contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) / Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, 1:right, 2:middle) + contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) + Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, + 1:right, 2:middle) pressure: 触点力度 / Contact pressure Returns: Job: 作业对象 / Job object """ - ctrl_id = Library.framework().MaaControllerPostTouchMove( - self._handle, contact, x, y, pressure - ) + ctrl_id = Library.framework().MaaControllerPostTouchMove(self._handle, contact, x, y, pressure) return self._gen_ctrl_job(ctrl_id) def post_touch_up(self, contact: int = 0) -> Job: """抬起 / Touch up Args: - contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) / Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, 1:right, 2:middle) + contact: 触点编号 (Adb 控制器: 手指编号; Win32 控制器: 鼠标按键 0:左键, 1:右键, 2:中键) + Contact number (Adb controller: finger number; Win32 controller: mouse button 0:left, + 1:right, 2:middle) Returns: Job: 作业对象 / Job object @@ -243,7 +242,8 @@ def post_touch_up(self, contact: int = 0) -> Job: return self._gen_ctrl_job(ctrl_id) def post_relative_move(self, dx: int, dy: int) -> Job: - """异步执行一次相对位移 (当前仅 Win32 controller 支持) / Asynchronously execute a relative movement (Currently only supported by Win32 controller) + """异步执行一次相对位移 (当前仅 Win32 controller 支持) + Asynchronously execute a relative movement (Currently only supported by Win32 controller) Args: dx: x 方向移动偏移 / x axis offset @@ -252,9 +252,7 @@ def post_relative_move(self, dx: int, dy: int) -> Job: Returns: Job: 作业对象 / Job object """ - ctrl_id = Library.framework().MaaControllerPostRelativeMove( - self._handle, dx, dy - ) + ctrl_id = Library.framework().MaaControllerPostRelativeMove(self._handle, dx, dy) return self._gen_ctrl_job(ctrl_id) def set_mouse_lock_follow(self, enabled: bool) -> bool: @@ -271,7 +269,10 @@ def set_mouse_lock_follow(self, enabled: bool) -> bool: """ c_enabled = ctypes.c_bool(enabled) return Library.framework().MaaControllerSetOption( - self._handle, MaaCtrlOptionEnum.MouseLockFollow, ctypes.byref(c_enabled), ctypes.sizeof(c_enabled) + self._handle, + MaaCtrlOptionEnum.MouseLockFollow, + ctypes.byref(c_enabled), + ctypes.sizeof(c_enabled), ) def set_background_managed_keys(self, keys: Sequence[int]) -> bool: @@ -287,20 +288,22 @@ def post_scroll(self, dx: int, dy: int) -> Job: """滚动 / Scroll Args: - dx: 水平滚动距离,正值向右滚动,负值向左滚动 / Horizontal scroll distance, positive for right, negative for left + dx: 水平滚动距离,正值向右滚动,负值向左滚动 + Horizontal scroll distance, positive for right, negative for left dy: 垂直滚动距离,正值向上滚动,负值向下滚动 / Vertical scroll distance, positive for up, negative for down Returns: Job: 作业对象 / Job object Note: - Win32 控制器和实现了 scroll 的自定义控制器支持滚动操作 / Win32 controllers and custom controllers that implement scroll support this operation + Win32 控制器和实现了 scroll 的自定义控制器支持滚动操作 + Win32 controllers and custom controllers that implement scroll support this operation 建议使用 120 的整数倍(WHEEL_DELTA)以获得最佳兼容性 / Using multiples of 120 (WHEEL_DELTA) is recommended """ ctrl_id = Library.framework().MaaControllerPostScroll(self._handle, dx, dy) return self._gen_ctrl_job(ctrl_id) - def post_screencap(self) -> JobWithResult: + def post_screencap(self) -> JobWithResult[numpy.ndarray]: """截图 / Screenshot Returns: @@ -328,30 +331,28 @@ def cached_image(self) -> numpy.ndarray: 返回的图像是经过缩放的,尺寸根据截图目标尺寸设置(长边/短边)而定,可能与设备原始分辨率不同。 使用 resolution 属性可获取设备的原始(未缩放)分辨率。 - The returned image is scaled according to the screenshot target size settings (long side / short side). + The returned image is scaled according to the screenshot target size settings (long side + short side). The image dimensions may differ from the raw device resolution. Use the resolution property to get the raw (unscaled) device resolution. """ image_buffer = ImageBuffer() - if not Library.framework().MaaControllerCachedImage( - self._handle, image_buffer._handle - ): + if not Library.framework().MaaControllerCachedImage(self._handle, image_buffer._handle): raise RuntimeError("Failed to get cached image.") return image_buffer.get() - def post_shell(self, cmd: str, timeout: int = 20000) -> JobWithResult: + def post_shell(self, cmd: str, timeout: int = 20000) -> JobWithResult[str]: """执行 shell 命令 (仅 ADB 控制器) / Execute shell command (ADB only) Args: cmd: shell 命令 / shell command - timeout: 超时时间(毫秒),默认 20000,设置为 -1 表示无限等待 / Timeout in milliseconds, default 20000, set to -1 for infinite wait + timeout: 超时时间(毫秒),默认 20000,设置为 -1 表示无限等待 + Timeout in milliseconds, default 20000, set to -1 for infinite wait Returns: JobWithResult: 作业对象,可通过 result 获取命令输出 / Job object, can get output via result """ - ctrl_id = Library.framework().MaaControllerPostShell( - self._handle, cmd.encode("utf-8"), timeout - ) + ctrl_id = Library.framework().MaaControllerPostShell(self._handle, cmd.encode("utf-8"), timeout) return JobWithResult( ctrl_id, self._status, @@ -365,7 +366,8 @@ def post_inactive(self) -> Job: 对于 Win32 控制器,这会恢复窗口位置(取消置顶)并解除输入阻断。 对于其他控制器,这是一个空操作,总是成功。 - For Win32 controllers, this restores window position (removes topmost) and unblocks user input. + For Win32 controllers, this restores window position (removes topmost) and unblocks user + input. For other controllers, this is a no-op that always succeeds. Returns: @@ -385,9 +387,7 @@ def shell_output(self) -> str: RuntimeError: 如果获取失败 """ string_buffer = StringBuffer() - if not Library.framework().MaaControllerGetShellOutput( - self._handle, string_buffer._handle - ): + if not Library.framework().MaaControllerGetShellOutput(self._handle, string_buffer._handle): raise RuntimeError("Failed to get shell output.") return string_buffer.get() @@ -416,24 +416,23 @@ def uuid(self) -> str: return buffer.get() @property - def info(self) -> Dict[str, Any]: + def info(self) -> dict[str, Any]: """获取控制器信息 / Get controller information Returns: - Dict[str, Any]: 控制器信息,包含类型、构造参数等 / Controller information including type, constructor parameters, etc. + Dict[str, Any]: 控制器信息,包含类型、构造参数等 + Controller information including type, constructor parameters, etc. Raises: RuntimeError: 如果获取失败 """ buffer = StringBuffer() - if not Library.framework().MaaControllerGetInfo( - self._handle, buffer._handle - ): + if not Library.framework().MaaControllerGetInfo(self._handle, buffer._handle): raise RuntimeError("Failed to get controller info.") return json.loads(buffer.get()) @property - def resolution(self) -> Tuple[int, int]: + def resolution(self) -> tuple[int, int]: """获取设备原始(未缩放)分辨率 / Get the raw (unscaled) device resolution Returns: @@ -445,15 +444,15 @@ def resolution(self) -> Tuple[int, int]: 需要在首次截图后才能获取到有效值,否则返回 (0, 0)。 This returns the actual device screen resolution before any scaling. - The screenshot obtained via cached_image is scaled according to the screenshot target size settings, + The screenshot obtained via cached_image is scaled according to the screenshot target + size settings, so its dimensions may differ from this raw resolution. - Valid values are only available after the first screenshot is taken, otherwise returns (0, 0). + Valid values are only available after the first screenshot is taken, otherwise returns + (0, 0). """ width = ctypes.c_int32() height = ctypes.c_int32() - if not Library.framework().MaaControllerGetResolution( - self._handle, ctypes.byref(width), ctypes.byref(height) - ): + if not Library.framework().MaaControllerGetResolution(self._handle, ctypes.byref(width), ctypes.byref(height)): return (0, 0) return (width.value, height.value) @@ -539,7 +538,7 @@ def set_screenshot_resize_method(self, method: int) -> bool: ) ) - _sink_holder: Dict[int, "ControllerEventSink"] = {} + _sink_holder: dict[int, "ControllerEventSink"] = {} def add_sink(self, sink: "ControllerEventSink") -> Optional[int]: """添加控制器事件监听器 / Add controller event listener @@ -550,11 +549,7 @@ def add_sink(self, sink: "ControllerEventSink") -> Optional[int]: Returns: Optional[int]: 监听器 id,失败返回 None / Listener id, or None if failed """ - sink_id = int( - Library.framework().MaaControllerAddSink( - self._handle, *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.framework().MaaControllerAddSink(self._handle, *EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None @@ -812,7 +807,7 @@ def __init__( address: str, screencap_methods: int = MaaAdbScreencapMethodEnum.Default, input_methods: int = MaaAdbInputMethodEnum.Default, - config: Dict[str, Any] = {}, + config: Optional[dict[str, Any]] = None, agent_path: Union[str, Path] = AGENT_BINARY_PATH, ): """创建 Adb 控制器 / Create Adb controller @@ -828,6 +823,8 @@ def __init__( Raises: RuntimeError: 如果创建失败 """ + if config is None: + config = {} super().__init__() self._set_adb_api_properties() @@ -844,7 +841,6 @@ def __init__( raise RuntimeError("Failed to create ADB controller.") def _set_adb_api_properties(self): - Library.framework().MaaAdbControllerCreate.restype = MaaControllerHandle Library.framework().MaaAdbControllerCreate.argtypes = [ ctypes.c_char_p, @@ -945,7 +941,7 @@ class AndroidNativeController(Controller): def __init__( self, - config: Dict[str, Any], + config: dict[str, Any], ): """创建 Android Native 控制器 / Create Android native controller @@ -1028,9 +1024,9 @@ class WlRootsController(Controller): """ def __init__( - self, - wlr_socket_path: str, - use_win32_vk_code: bool = False, + self, + wlr_socket_path: str, + use_win32_vk_code: bool = False, ): """创建 WlRoots 控制器 / Create WlRoots controller @@ -1064,7 +1060,9 @@ def _set_wlroots_api_properties(self): class DbgController(Controller): - """调试控制器,轮播图片截图,基本输入操作直接返回成功 / Debug controller that cycles through images from a directory""" + """调试控制器,轮播图片截图,基本输入操作直接返回成功 + + Debug controller that cycles through images from a directory""" def __init__( self, @@ -1074,7 +1072,9 @@ def __init__( Args: read_path: 图片目录(或单个图片文件)路径。连接时加载所有图片,截图时轮播。 - / Path to a directory of images (or a single image file). Images are loaded on connect and cycled on screencap. + + Path to a directory of images (or a single image file). Images are loaded on + connect and cycled on screencap. Raises: RuntimeError: 如果创建失败 @@ -1107,7 +1107,9 @@ def __init__( Args: recording_path: 录制 JSONL 文件路径,由 RecordController 写入。截图路径基于该文件所在目录解析。 - / Path to the recording JSONL file written by RecordController. Screenshot paths are resolved relative to this file's parent directory. + + Path to the recording JSONL file written by RecordController. Screenshot + paths are resolved relative to this file's parent directory. Raises: RuntimeError: 如果创建失败 @@ -1130,7 +1132,8 @@ def _set_replay_api_properties(self): class RecordController(Controller): - """录制控制器,包装现有控制器并记录所有操作 / Record controller that wraps an existing controller and records all operations""" + """录制控制器,包装现有控制器并记录所有操作 + Record controller that wraps an existing controller and records all operations""" def __init__( self, @@ -1142,7 +1145,9 @@ def __init__( Args: inner: 被包装的内部控制器 / The inner controller to wrap recording_path: 录制 JSONL 文件输出路径。截图会保存到同目录下的 "{stem}-Screenshot" 文件夹。 - / Path to the recording JSONL file to write. Screenshots are saved to a "{stem}-Screenshot" folder in the same directory. + + Path to the recording JSONL file to write. Screenshots are saved to a + "{stem}-Screenshot" folder in the same directory. Raises: RuntimeError: 如果创建失败 @@ -1171,7 +1176,8 @@ class GamepadController(Controller): """虚拟手柄控制器 (仅 Windows) / Virtual gamepad controller (Windows only) 通过 ViGEm 模拟 Xbox 360 或 DualShock 4 手柄,用于控制需要手柄输入的游戏。 - Emulates Xbox 360 or DualShock 4 gamepad via ViGEm for controlling games that require gamepad input. + Emulates Xbox 360 or DualShock 4 gamepad via ViGEm for controlling games that require gamepad + input. 需要安装 ViGEm Bus Driver: https://github.com/ViGEm/ViGEmBus/releases Requires ViGEm Bus Driver: https://github.com/ViGEm/ViGEmBus/releases @@ -1194,9 +1200,12 @@ def __init__( """创建虚拟手柄控制器 / Create virtual gamepad controller Args: - hWnd: 窗口句柄,用于截图 (可为 None,不需要截图时) / Window handle for screencap (can be None if screencap not needed) - gamepad_type: 手柄类型 (MaaGamepadTypeEnum.Xbox360 或 MaaGamepadTypeEnum.DualShock4) / Gamepad type - screencap_method: 截图方式 (当 hWnd 不为 None 时使用) / Screencap method (used when hWnd is not None) + hWnd: 窗口句柄,用于截图 (可为 None,不需要截图时) + Window handle for screencap (can be None if screencap not needed) + gamepad_type: 手柄类型 (MaaGamepadTypeEnum.Xbox360 或 MaaGamepadTypeEnum.DualShock4) + Gamepad type + screencap_method: 截图方式 (当 hWnd 不为 None 时使用) + Screencap method (used when hWnd is not None) Raises: RuntimeError: 如果创建失败 @@ -1223,7 +1232,6 @@ def _set_gamepad_api_properties(self): class CustomController(Controller): - _callbacks: MaaCustomControllerCallbacks def __init__(self): @@ -1263,7 +1271,7 @@ def __init__(self): raise RuntimeError("Failed to create Custom controller.") @property - def c_handle(self) -> ctypes.POINTER(MaaCustomControllerCallbacks): + def c_handle(self) -> Any: return ctypes.pointer(self._callbacks) @property @@ -1274,7 +1282,13 @@ def c_arg(self) -> ctypes.c_void_p: def connect(self) -> bool: raise NotImplementedError - def connected(self) -> bool: + # Intentionally shadows Controller.connected (a @property) with a method: + # the C callback (MaaCustomControllerCallbacks.ConnectedFunc) invokes + # `self.connected()`, so it must be callable. `connected()` is a public + # override point (see test/python/binding_test.py), so it cannot be renamed + # without breaking the API. The method-overrides-property clash is inherent + # to this design, not a defect to fix. + def connected(self) -> bool: # pyright: ignore[reportIncompatibleMethodOverride] """检查是否已连接(可选实现,默认返回 True)""" return True @@ -1361,7 +1375,7 @@ def inactive(self) -> bool: """设置控制器为不活跃状态(可选实现,默认返回 True)""" return True - def get_custom_info(self) -> Dict[str, Any]: + def get_custom_info(self) -> dict[str, Any]: """获取自定义控制器的额外信息(可选实现,默认返回空字典) Get custom controller's extra info (optional, returns empty dict by default) @@ -1437,7 +1451,7 @@ def _c_get_features_agent(trans_arg: ctypes.c_void_p) -> int: @staticmethod @MaaCustomControllerCallbacks.StartAppFunc def _c_start_app_agent( - c_intent: ctypes.c_char_p, + c_intent: bytes, trans_arg: ctypes.c_void_p, ) -> int: if not trans_arg: @@ -1453,7 +1467,7 @@ def _c_start_app_agent( @staticmethod @MaaCustomControllerCallbacks.StopAppFunc def _c_stop_app_agent( - c_intent: ctypes.c_char_p, + c_intent: bytes, trans_arg: ctypes.c_void_p, ) -> int: if not trans_arg: @@ -1522,9 +1536,7 @@ def _c_swipe_agent( ctypes.py_object, ).value - return int( - self.swipe(int(c_x1), int(c_y1), int(c_x2), int(c_y2), int(c_duration)) - ) + return int(self.swipe(int(c_x1), int(c_y1), int(c_x2), int(c_y2), int(c_duration))) @staticmethod @MaaCustomControllerCallbacks.TouchDownFunc @@ -1631,7 +1643,7 @@ def _c_key_up_agent( @staticmethod @MaaCustomControllerCallbacks.InputTextFunc def _c_input_text_agent( - c_text: ctypes.c_char_p, + c_text: bytes, trans_arg: ctypes.c_void_p, ) -> int: if not trans_arg: @@ -1681,7 +1693,7 @@ def _c_relative_move_agent( @staticmethod @MaaCustomControllerCallbacks.ShellFunc def _c_shell_agent( - c_cmd: ctypes.c_char_p, + c_cmd: bytes, c_timeout: ctypes.c_int64, trans_arg: ctypes.c_void_p, c_buffer: MaaStringBufferHandle, @@ -1746,14 +1758,13 @@ def _set_custom_api_properties(self): class ControllerEventSink(EventSink): - @dataclass class ControllerActionDetail: ctrl_id: int uuid: str action: str - param: dict - info: dict + param: dict[str, Any] + info: dict[str, Any] def on_controller_action( self, @@ -1763,11 +1774,10 @@ def on_controller_action( ): pass - def on_raw_notification(self, controller: Controller, msg: str, details: dict): + def on_raw_notification(self, controller: Controller, msg: str, details: dict[str, Any]) -> None: pass - def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict): - + def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict[str, Any]) -> None: controller = Controller(handle=handle) self.on_raw_notification(controller, msg, details) diff --git a/source/binding/Python/maa/custom_action.py b/source/binding/Python/maa/custom_action.py index 476c19cd4e..69c846ac93 100644 --- a/source/binding/Python/maa/custom_action.py +++ b/source/binding/Python/maa/custom_action.py @@ -1,6 +1,7 @@ import ctypes -from dataclasses import dataclass from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Union from .buffer import RectBuffer from .context import Context @@ -22,7 +23,7 @@ def run(self, context, argv): return True """ - _handle: MaaCustomActionCallback + _handle: Any def __init__(self): self._handle = self._c_run_agent @@ -70,12 +71,13 @@ def run( argv: 动作参数 / Action arguments Returns: - Union[RunResult, bool]: 执行结果,可返回 RunResult 或 bool / Execution result, can return RunResult or bool + Union[RunResult, bool]: 执行结果,可返回 RunResult 或 bool + Execution result, can return RunResult or bool """ raise NotImplementedError @property - def c_handle(self) -> MaaCustomActionCallback: + def c_handle(self) -> Any: return self._handle @property @@ -87,9 +89,9 @@ def c_arg(self) -> ctypes.c_void_p: def _c_run_agent( c_context: MaaContextHandle, c_task_id: MaaTaskId, - c_node_name: ctypes.c_char_p, - c_custom_action_name: ctypes.c_char_p, - c_custom_action_param: ctypes.c_char_p, + c_node_name: bytes, + c_custom_action_name: bytes, + c_custom_action_param: bytes, c_reco_id: MaaRecoId, c_box: MaaRectHandle, c_transparent_arg: ctypes.c_void_p, @@ -110,7 +112,7 @@ def _c_run_agent( box = RectBuffer(c_box).get() - result: Union[CustomAction.RunResult, bool] = self.run( + result: Union[CustomAction.RunResult, bool, None] = self.run( context, CustomAction.RunArg( task_detail=task_detail, @@ -125,11 +127,11 @@ def _c_run_agent( if isinstance(result, CustomAction.RunResult): return int(result.success) - elif isinstance(result, bool): + elif isinstance(result, bool): # pyright: ignore[reportUnnecessaryIsInstance] return int(result) elif result is None: return int(True) else: - raise TypeError(f"Invalid return type: {result!r}") + raise TypeError(f"Invalid return type: {result!r}") # pyright: ignore[reportUnreachable] diff --git a/source/binding/Python/maa/custom_recognition.py b/source/binding/Python/maa/custom_recognition.py index ef14e62bc3..1b7fa629ec 100644 --- a/source/binding/Python/maa/custom_recognition.py +++ b/source/binding/Python/maa/custom_recognition.py @@ -1,7 +1,8 @@ import ctypes import json -from dataclasses import dataclass from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any, Optional, Union import numpy @@ -15,8 +16,10 @@ class CustomRecognition(ABC): 用于实现自定义的 Pipeline 识别算法。继承此类并实现 analyze 方法, 然后通过 Resource.register_custom_recognition 或 AgentServer.register_custom_recognition 注册。 - Used to implement custom Pipeline recognition algorithms. Inherit this class and implement the analyze method, - then register via Resource.register_custom_recognition or AgentServer.register_custom_recognition. + Used to implement custom Pipeline recognition algorithms. Inherit this class and implement the + analyze method, + then register via Resource.register_custom_recognition or + AgentServer.register_custom_recognition. Example: class MyRecognition(CustomRecognition): @@ -25,7 +28,7 @@ def analyze(self, context, argv): return (100, 100, 50, 50) """ - _handle: MaaCustomRecognitionCallback + _handle: Any def __init__(self): self._handle = self._c_analyze_agent @@ -38,7 +41,8 @@ class AnalyzeArg: task_detail: 当前任务详情 / Current task detail node_name: 当前节点名 / Current node name custom_recognition_name: 自定义识别器名 / Custom recognition name - custom_recognition_param: 自定义识别器参数 (JSON 字符串) / Custom recognition parameter (JSON string) + custom_recognition_param: 自定义识别器参数 (JSON 字符串) + Custom recognition parameter (JSON string) image: 待识别的图像 (BGR 格式) / Image to recognize (BGR format) roi: 识别区域 / Recognition region of interest """ @@ -60,7 +64,7 @@ class AnalyzeResult: """ box: Optional[RectType] - detail: dict + detail: dict[str, Any] @abstractmethod def analyze( @@ -83,7 +87,7 @@ def analyze( raise NotImplementedError @property - def c_handle(self) -> MaaCustomRecognitionCallback: + def c_handle(self) -> Any: return self._handle @property @@ -95,9 +99,9 @@ def c_arg(self) -> ctypes.c_void_p: def _c_analyze_agent( c_context: MaaContextHandle, c_task_id: MaaTaskId, - c_node_name: ctypes.c_char_p, - c_custom_reco_name: ctypes.c_char_p, - c_custom_reco_param: ctypes.c_char_p, + c_node_name: bytes, + c_custom_reco_name: bytes, + c_custom_reco_param: bytes, c_image: MaaImageBufferHandle, c_roi: MaaRectHandle, c_transparent_arg: ctypes.c_void_p, @@ -116,18 +120,16 @@ def _c_analyze_agent( image = ImageBuffer(c_image).get() - result: Union[CustomRecognition.AnalyzeResult, Optional[RectType]] = ( - self.analyze( - context, - CustomRecognition.AnalyzeArg( - task_detail=task_detail, - node_name=c_node_name.decode(), - custom_recognition_name=c_custom_reco_name.decode(), - custom_recognition_param=c_custom_reco_param.decode(), - image=image, - roi=RectBuffer(c_roi).get(), - ), - ) + result: Union[CustomRecognition.AnalyzeResult, Optional[RectType]] = self.analyze( + context, + CustomRecognition.AnalyzeArg( + task_detail=task_detail, + node_name=c_node_name.decode(), + custom_recognition_name=c_custom_reco_name.decode(), + custom_recognition_param=c_custom_reco_param.decode(), + image=image, + roi=RectBuffer(c_roi).get(), + ), ) rect_buffer = RectBuffer(c_out_box) @@ -142,17 +144,9 @@ def _c_analyze_agent( # RectType elif ( isinstance(result, Rect) - or ( - isinstance(result, list) - and len(result) == 4 - and all(isinstance(x, int) for x in result) - ) + or (isinstance(result, list) and len(result) == 4) or (isinstance(result, numpy.ndarray) and result.size == 4) - or ( - isinstance(result, tuple) - and len(result) == 4 - and all(isinstance(x, int) for x in result) - ) + or (isinstance(result, tuple) and len(result) == 4) ): rect_buffer.set(result) return int(True) diff --git a/source/binding/Python/maa/define.py b/source/binding/Python/maa/define.py index a431edc4c0..f626e4db1f 100644 --- a/source/binding/Python/maa/define.py +++ b/source/binding/Python/maa/define.py @@ -1,12 +1,130 @@ import ctypes import platform +from collections.abc import Iterator from dataclasses import dataclass from enum import IntEnum -from typing import Callable, Iterator, List, Tuple, Union, Dict, Optional +from typing import Any, Callable, Optional, Union import numpy from strenum import StrEnum # For Python 3.9/3.10 +__all__ = [ + # Primitive ctypes aliases + "MaaBool", + "MaaSize", + "MaaNullSize", + "MaaId", + "MaaCtrlId", + "MaaResId", + "MaaTaskId", + "MaaRecoId", + "MaaActId", + "MaaNodeId", + "MaaWfId", + "MaaSinkId", + "MaaInvalidId", + "MaaStatus", + "MaaLoggingLevel", + "MaaOptionValueSize", + "MaaOptionValue", + "MaaOption", + "MaaGlobalOption", + "MaaCtrlOption", + "MaaResOption", + # Handle aliases + "MaaStringBufferHandle", + "MaaImageBufferHandle", + "MaaRectHandle", + "MaaStringListBufferHandle", + "MaaImageListBufferHandle", + "MaaResourceHandle", + "MaaControllerHandle", + "MaaTaskerHandle", + "MaaContextHandle", + "MaaAgentClientHandle", + "MaaToolkitAdbDeviceListHandle", + "MaaToolkitAdbDeviceHandle", + "MaaToolkitDesktopWindowListHandle", + "MaaToolkitDesktopWindowHandle", + "MaaMacOSPermission", + # Bitmask / method aliases + "MaaAdbScreencapMethod", + "MaaAdbInputMethod", + "MaaWin32ScreencapMethod", + "MaaWin32InputMethod", + "MaaMacOSScreencapMethod", + "MaaMacOSInputMethod", + "MaaGamepadType", + "MaaControllerFeature", + # FFI callback factories + "FUNCTYPE", + "MaaEventCallback", + "MaaCustomRecognitionCallback", + "MaaCustomActionCallback", + "MaaCustomControllerCallbacks", + # Enums + "MaaStatusEnum", + "MaaGlobalOptionEnum", + "MaaCtrlOptionEnum", + "MaaInferenceDeviceEnum", + "MaaInferenceExecutionProviderEnum", + "MaaResOptionEnum", + "MaaAdbScreencapMethodEnum", + "MaaAdbInputMethodEnum", + "MaaWin32ScreencapMethodEnum", + "MaaWin32InputMethodEnum", + "MaaMacOSScreencapMethodEnum", + "MaaMacOSInputMethodEnum", + "MaaGamepadTypeEnum", + "MaaGamepadButtonEnum", + "MaaGamepadContactEnum", + "MaaControllerFeatureEnum", + "MaaMacOSPermissionEnum", + "AlgorithmEnum", + "ActionEnum", + "LoggingLevelEnum", + # Core types + "Status", + "Point", + "Rect", + "PointType", + "RectType", + # Recognition / action results + "BoxAndScoreResult", + "TemplateMatchResult", + "BoxAndCountResult", + "FeatureMatchResult", + "ColorMatchResult", + "OCRResult", + "NeuralNetworkResult", + "NeuralNetworkClassifyResult", + "NeuralNetworkDetectResult", + "CustomRecognitionResult", + "AndRecognitionResult", + "OrRecognitionResult", + "RecognitionResult", + "RecognitionDetail", + "ClickActionResult", + "LongPressActionResult", + "SwipeActionResult", + "MultiSwipeActionResult", + "ClickKeyActionResult", + "LongPressKeyActionResult", + "InputTextActionResult", + "AppActionResult", + "ScrollActionResult", + "TouchActionResult", + "ShellActionResult", + "ActionResult", + "ActionDetail", + "WaitFreezesDetail", + "NodeDetail", + "TaskDetail", + # Result lookup maps + "AlgorithmResultDict", + "ActionResultDict", +] + MaaBool = ctypes.c_uint8 MaaSize = ctypes.c_size_t MaaNullSize = MaaSize(-1) @@ -113,16 +231,19 @@ class MaaGlobalOptionEnum(IntEnum): class MaaCtrlOptionEnum(IntEnum): Invalid = 0 - # Only one of long and short side can be set, and the other is automatically scaled according to the aspect ratio. + # Only one of long and short side can be set, and the other is automatically scaled according to + # the aspect ratio. # value: int, eg: 1920; val_size: sizeof(int) ScreenshotTargetLongSide = 1 - # Only one of long and short side can be set, and the other is automatically scaled according to the aspect ratio. + # Only one of long and short side can be set, and the other is automatically scaled according to + # the aspect ratio. # value: int, eg: 1080; val_size: sizeof(int) ScreenshotTargetShortSide = 2 # Screenshot use raw size without scaling. - # Please note that this option may cause incorrect coordinates on user devices with different resolutions if scaling is not performed. + # Please note that this option may cause incorrect coordinates on user devices with different + # resolutions if scaling is not performed. # value: bool, eg: true; val_size: sizeof(bool) ScreenshotUseRawSize = 3 @@ -146,7 +267,8 @@ class MaaCtrlOptionEnum(IntEnum): ScreenshotResizeMethod = 6 # Configure background managed key domain for Win32 controllers. - # Must be set before connection. After setting, matching ClickKey / LongPressKey / KeyDown / KeyUp + # Must be set before connection. After setting, matching + # ClickKey / LongPressKey / KeyDown / KeyUp # operations automatically route through the background guardian path. # Only supported by Win32 controllers; other controllers will fail. # value: int32_t array of virtual key codes; val_size: sizeof(int32_t) * count @@ -173,8 +295,10 @@ class MaaInferenceExecutionProviderEnum(IntEnum): # MaaResOption_InferenceDevice will be used to set coreml_flag, # Reference to - # https://github.com/microsoft/onnxruntime/blob/main/include/onnxruntime/core/providers/coreml/coreml_provider_factory.h - # But you need to pay attention to the onnxruntime version we use, the latest flag may not be supported. + # https://github.com/microsoft/onnxruntime/blob/main/include/onnxruntime/core/providers/coreml/coreml_provider_facto + # ry.h + # But you need to pay attention to the onnxruntime version we use, the latest flag may not be + # supported. CoreML = 3 # MaaResOption_InferenceDevice will be used to set NVIDIA GPU ID @@ -291,14 +415,14 @@ class MaaWin32ScreencapMethodEnum(IntEnum): Different applications use different rendering methods, there is no universal solution. - | Method | Speed | Compatibility | Require Admin | Background Support | Notes | - |-------------------------|-----------|---------------|---------------|--------------------|----------------------------------| - | GDI | Fast | Medium | No | No | | - | FramePool | Very Fast | Medium | No | Yes | Requires Windows 10 1903+ | - | DXGI_DesktopDup | Very Fast | Low | No | No | Desktop duplication (full screen)| - | DXGI_DesktopDup_Window | Very Fast | Low | No | No | Desktop duplication then crop | - | PrintWindow | Medium | Medium | No | Yes | | - | ScreenDC | Fast | High | No | No | | + | Method | Speed | Compat | Admin | Bg | Notes | + |------------------------|-----------|--------|-------|----|---------------------------------| + | GDI | Fast | Medium | No | No | | + | FramePool | Very Fast | Medium | No | Yes| Requires Windows 10 1903+ | + | DXGI_DesktopDup | Very Fast | Low | No | No | Desktop duplication (fullscreen)| + | DXGI_DesktopDup_Window | Very Fast | Low | No | No | Desktop duplication then crop | + | PrintWindow | Medium | Medium | No | Yes| | + | ScreenDC | Fast | High | No | No | | Note: FramePool and PrintWindow support pseudo-minimize. Other methods still fail when the target window is minimized. @@ -330,17 +454,17 @@ class MaaWin32InputMethodEnum(IntEnum): Different applications process input differently, there is no universal solution. - | Method | Compatibility | Require Admin | Seize Mouse | Background Support | Notes | - |------------------------------|---------------|---------------|--------------|--------------------|------------------------------------------------------------- | - | Seize | High | No | Yes | No | | - | SendMessage | Medium | Maybe | No | Yes | | - | PostMessage | Medium | Maybe | No | Yes | | - | LegacyEvent | Low | No | Yes | No | | - | PostThreadMessage | Low | Maybe | No | Yes | | - | SendMessageWithCursorPos | Medium | Maybe | Briefly | Yes | Moves cursor to target position, then restores | - | PostMessageWithCursorPos | Medium | Maybe | Briefly | Yes | Moves cursor to target position, then restores | - | SendMessageWithWindowPos | Medium | Maybe | No | Yes | Moves window to align target with cursor, then restores | - | PostMessageWithWindowPos | Medium | Maybe | No | Yes | Moves window to align target with cursor, then restores | + | Method | Compat | Admin | Seize Mouse | Bg | Notes | + |--------------------------|--------|-------|-------------|----|---------------------------------------| + | Seize | High | No | Yes | No | | + | SendMessage | Medium | Maybe | No | Yes| | + | PostMessage | Medium | Maybe | No | Yes| | + | LegacyEvent | Low | No | Yes | No | | + | PostThreadMessage | Low | Maybe | No | Yes| | + | SendMessageWithCursorPos | Medium | Maybe | Briefly | Yes| Moves cursor to target, then restores | + | PostMessageWithCursorPos | Medium | Maybe | Briefly | Yes| Moves cursor to target, then restores | + | SendMessageWithWindowPos | Medium | Maybe | No | Yes| Moves window to align w/ cursor, rest.| + | PostMessageWithWindowPos | Medium | Maybe | No | Yes| Moves window to align w/ cursor, rest.| Note: - Admin rights mainly depend on the target application's privilege level. @@ -407,6 +531,7 @@ class MaaMacOSInputMethodEnum(IntEnum): GlobalEvent = 1 PostToPid = 1 << 1 + # No bitwise OR, just set it MaaGamepadType = ctypes.c_uint64 @@ -501,12 +626,9 @@ class MaaControllerFeatureEnum(IntEnum): UseKeyboardDownAndUpInsteadOfClick = 1 << 1 - FUNCTYPE = ctypes.WINFUNCTYPE if (platform.system() == "Windows") else ctypes.CFUNCTYPE -MaaEventCallback = FUNCTYPE( - None, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p -) +MaaEventCallback = FUNCTYPE(None, ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_void_p) MaaCustomRecognitionCallback = FUNCTYPE( MaaBool, # return value @@ -542,10 +664,12 @@ class MaaControllerFeatureEnum(IntEnum): MaaMacOSPermission = ctypes.c_int32 + class MaaMacOSPermissionEnum(IntEnum): ScreenCapture = 1 Accessibility = 2 + MaaAgentClientHandle = ctypes.c_void_p @@ -732,23 +856,17 @@ def __add__( self, other: Union[ "Point", - Tuple[int, int], - List[int], + tuple[int, int], + list[int], ], - ): - if ( - isinstance(other, Point) - or isinstance(other, tuple) - or (isinstance(other, list) and len(other) == 2) - ): - x1, y1 = self - x2, y2 = other - return Point( - x1 + x2, - y1 + y2, - ) - - raise TypeError(f"Cannot add {type(other).__name__} to Point") + ) -> "Point": + values = list(other) + if len(values) != 2: + raise TypeError(f"Cannot add {type(other).__name__} to Point") + return Point( + self.x + values[0], + self.y + values[1], + ) def __iter__(self) -> Iterator[int]: yield self.x @@ -769,25 +887,19 @@ def __add__( self, other: Union[ "Rect", - Tuple[int, int, int, int], - List[int], + tuple[int, int, int, int], + list[int], ], - ): - if ( - isinstance(other, Rect) - or isinstance(other, tuple) - or (isinstance(other, list) and len(other) == 4) - ): - x1, y1, w1, h1 = self - x2, y2, w2, h2 = other - return Rect( - x1 + x2, - y1 + y2, - w1 + w2, - h1 + h2, - ) - - raise TypeError(f"Cannot add {type(other).__name__} to Rect") + ) -> "Rect": + values = list(other) + if len(values) != 4: + raise TypeError(f"Cannot add {type(other).__name__} to Rect") + return Rect( + self.x + values[0], + self.y + values[1], + self.w + values[2], + self.h + values[3], + ) def __iter__(self) -> Iterator[int]: yield self.x @@ -801,16 +913,16 @@ def __getitem__(self, key: int) -> int: PointType = Union[ Point, - List[int], + list[int], numpy.ndarray, - Tuple[int, int], + tuple[int, int], ] RectType = Union[ Rect, - List[int], + list[int], numpy.ndarray, - Tuple[int, int, int, int], + tuple[int, int, int, int], ] @@ -889,21 +1001,21 @@ class NeuralNetworkResult(BoxAndScoreResult): @dataclass class CustomRecognitionResult: box: Rect - detail: Union[str, Dict] + detail: Union[str, dict[str, Any]] @dataclass class AndRecognitionResult: """And 算法识别结果,包含所有子识别的完整详情""" - sub_results: List["RecognitionDetail"] + sub_results: list["RecognitionDetail"] @dataclass class OrRecognitionResult: """Or 算法识别结果,包含已执行子识别的完整详情""" - sub_results: List["RecognitionDetail"] + sub_results: list["RecognitionDetail"] RecognitionResult = Union[ @@ -940,13 +1052,13 @@ class RecognitionDetail: hit: bool box: Optional[Rect] - all_results: List[RecognitionResult] - filtered_results: List[RecognitionResult] + all_results: list[RecognitionResult] + filtered_results: list[RecognitionResult] best_result: Optional[RecognitionResult] - raw_detail: Dict + raw_detail: dict[str, Any] raw_image: numpy.ndarray # only valid in debug mode - draw_images: List[numpy.ndarray] # only valid in debug mode + draw_images: list[numpy.ndarray] # only valid in debug mode @dataclass @@ -967,9 +1079,9 @@ class LongPressActionResult: @dataclass class SwipeActionResult: begin: Point - end: List[Point] - end_hold: List[int] - duration: List[int] + end: list[Point] + end_hold: list[int] + duration: list[int] only_hover: bool starting: int contact: int @@ -978,17 +1090,17 @@ class SwipeActionResult: @dataclass class MultiSwipeActionResult: - swipes: List[SwipeActionResult] + swipes: list[SwipeActionResult] @dataclass class ClickKeyActionResult: - keycode: List[int] + keycode: list[int] @dataclass class LongPressKeyActionResult: - keycode: List[int] + keycode: list[int] duration: int @@ -1071,7 +1183,7 @@ class ActionDetail: box: Rect success: bool result: Optional[ActionResult] - raw_detail: Dict + raw_detail: dict[str, Any] @dataclass @@ -1081,7 +1193,7 @@ class WaitFreezesDetail: phase: str success: bool elapsed_ms: int - reco_id_list: List[int] + reco_id_list: list[int] roi: Rect @@ -1122,7 +1234,7 @@ def __init__( self, task_id: int, entry: str, - node_id_list: List[int], + node_id_list: list[int], status: "Status", node_detail_func: Optional[Callable[[int], Optional["NodeDetail"]]] = None, ): @@ -1131,13 +1243,13 @@ def __init__( self.node_id_list = node_id_list self.status = status self._node_detail_func = node_detail_func - self._nodes: Optional[List[NodeDetail]] = None + self._nodes: Optional[list[NodeDetail]] = None @property - def nodes(self) -> List[NodeDetail]: + def nodes(self) -> list[NodeDetail]: if self._nodes is None: if self._node_detail_func is not None: - nodes = [] + nodes: list[NodeDetail] = [] for nid in self.node_id_list: node = self._node_detail_func(nid) if node is None: diff --git a/source/binding/Python/maa/event_sink.py b/source/binding/Python/maa/event_sink.py index 46849554d4..9b03dee7ee 100644 --- a/source/binding/Python/maa/event_sink.py +++ b/source/binding/Python/maa/event_sink.py @@ -1,12 +1,10 @@ import ctypes import json -from abc import ABC -from typing import Tuple from enum import IntEnum +from typing import Any from .define import MaaEventCallback - # class NotificationEvent(IntEnum): # ResourceLoading = 1 # ControllerAction = 2 @@ -35,16 +33,17 @@ class NotificationType(IntEnum): Failed = 3 -class EventSink(ABC): +class EventSink: """事件监听器基类 / Event sink base class - 用于接收 MaaFramework 各种事件回调的抽象基类。 + 用于接收 MaaFramework 各种事件回调的基类。 派生类包括 ResourceEventSink、ControllerEventSink、TaskerEventSink、ContextEventSink。 - Abstract base class for receiving various event callbacks from MaaFramework. - Derived classes include ResourceEventSink, ControllerEventSink, TaskerEventSink, ContextEventSink. + Base class for receiving various event callbacks from MaaFramework. + Derived classes include ResourceEventSink, ControllerEventSink, TaskerEventSink, + ContextEventSink. """ - def on_unknown_notification(self, instance, msg: str, details: dict): + def on_unknown_notification(self, instance: Any, msg: str, details: dict[str, Any]) -> None: """处理未知类型的通知 / Handle unknown notification 当收到无法识别的通知时调用。 @@ -57,11 +56,11 @@ def on_unknown_notification(self, instance, msg: str, details: dict): """ pass - def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict): + def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict[str, Any]) -> None: pass @property - def c_callback(self) -> MaaEventCallback: + def c_callback(self) -> Any: return self._c_sink_agent @property @@ -71,7 +70,7 @@ def c_callback_arg(self) -> ctypes.c_void_p: @staticmethod def _gen_c_param( sink: "EventSink", - ) -> Tuple[MaaEventCallback, ctypes.c_void_p]: + ) -> tuple[Any, ctypes.c_void_p]: return sink.c_callback, sink.c_callback_arg @staticmethod @@ -89,15 +88,13 @@ def _notification_type(message: str) -> NotificationType: @MaaEventCallback def _c_sink_agent( handle: ctypes.c_void_p, - msg: ctypes.c_char_p, - details_json: ctypes.c_char_p, + msg: bytes, + details_json: bytes, callback_arg: ctypes.c_void_p, - ): + ) -> None: if not callback_arg: return self: EventSink = ctypes.cast(callback_arg, ctypes.py_object).value - self._on_raw_notification( - handle, msg.decode(), json.loads(details_json.decode()) - ) + self._on_raw_notification(handle, msg.decode(), json.loads(details_json.decode())) diff --git a/source/binding/Python/maa/job.py b/source/binding/Python/maa/job.py index a05333c1c8..5b7d69da20 100644 --- a/source/binding/Python/maa/job.py +++ b/source/binding/Python/maa/job.py @@ -1,8 +1,10 @@ import json -from typing import Dict +from typing import Any, Callable, Generic, Optional, TypeVar from .define import * +TResult = TypeVar("TResult") + class Job: """异步作业句柄 / Asynchronous job handle @@ -14,7 +16,12 @@ class Job: _job_id: MaaId - def __init__(self, job_id: MaaId, status_func, wait_func): + def __init__( + self, + job_id: MaaId, + status_func: Callable[[int], MaaStatus], + wait_func: Callable[[int], MaaStatus], + ) -> None: self._job_id = job_id self._status_func = status_func self._wait_func = wait_func @@ -37,7 +44,7 @@ def wait(self) -> "Job": Returns: Job: 返回自身,支持链式调用 / Returns self for method chaining """ - self._wait_func(self._job_id) + self._wait_func(int(self._job_id)) return self @property @@ -47,7 +54,7 @@ def status(self) -> Status: Returns: Status: 作业状态 / Job status """ - return Status(self._status_func(self._job_id)) + return Status(self._status_func(int(self._job_id))) @property def done(self) -> bool: @@ -95,18 +102,24 @@ def running(self) -> bool: return self.status.running -class JobWithResult(Job): +class JobWithResult(Job, Generic[TResult]): """带结果的异步作业句柄 / Asynchronous job handle with result 继承自 Job,额外提供获取作业结果的功能。 Inherits from Job, additionally provides the ability to get job result. """ - def __init__(self, job_id: MaaId, status_func, wait_func, get_func): + def __init__( + self, + job_id: MaaId, + status_func: Callable[[int], MaaStatus], + wait_func: Callable[[int], MaaStatus], + get_func: Callable[[int], TResult], + ) -> None: super().__init__(job_id, status_func, wait_func) self._get_func = get_func - def wait(self) -> "JobWithResult": + def wait(self) -> "JobWithResult[TResult]": """等待作业完成 / Wait for job completion Returns: @@ -115,13 +128,14 @@ def wait(self) -> "JobWithResult": super().wait() return self - def get(self, wait: bool = False): + def get(self, wait: bool = False) -> TResult: """获取作业结果 / Get job result Args: wait: 是否在获取结果前等待作业完成,默认为 False。建议先显式调用 wait()(或传入 wait=True), 确保异步操作已完成后再获取结果 / Whether to wait for job completion before getting result, - default is False. It's recommended to call wait() first (or pass wait=True) to ensure the + default is False. It's recommended to call wait() first (or pass wait=True) to + ensure the async operation is finished before getting the result. Returns: @@ -131,17 +145,24 @@ def get(self, wait: bool = False): if wait: self.wait() - return self._get_func(self._job_id) + return self._get_func(int(self._job_id)) -class TaskJob(JobWithResult): +class TaskJob(JobWithResult[Optional["TaskDetail"]]): """任务作业句柄 / Task job handle 继承自 JobWithResult,额外提供任务相关的操作。 Inherits from JobWithResult, additionally provides task-related operations. """ - def __init__(self, job_id: MaaId, status_func, wait_func, get_func, override_pipeline_func): + def __init__( + self, + job_id: MaaId, + status_func: Callable[[int], MaaStatus], + wait_func: Callable[[int], MaaStatus], + get_func: Callable[[int], Optional["TaskDetail"]], + override_pipeline_func: Callable[[int, bytes], bool], + ) -> None: super().__init__(job_id, status_func, wait_func, get_func) self._override_pipeline_func = override_pipeline_func @@ -154,7 +175,7 @@ def wait(self) -> "TaskJob": super().wait() return self - def override_pipeline(self, pipeline_override: Dict) -> bool: + def override_pipeline(self, pipeline_override: dict[str, Any]) -> bool: """覆盖此任务的 pipeline / Override pipeline for this task 在任务执行期间动态修改 pipeline 配置 @@ -167,6 +188,6 @@ def override_pipeline(self, pipeline_override: Dict) -> bool: bool: 是否成功 / Whether successful """ return self._override_pipeline_func( - self._job_id, + int(self._job_id), json.dumps(pipeline_override, ensure_ascii=False).encode(), ) diff --git a/source/binding/Python/maa/library.py b/source/binding/Python/maa/library.py index 4e959efd9d..ce8754cb98 100644 --- a/source/binding/Python/maa/library.py +++ b/source/binding/Python/maa/library.py @@ -19,10 +19,15 @@ class Library: _toolkit: Optional[ctypes.CDLL] = None _agent_client: Optional[ctypes.CDLL] = None _agent_server: Optional[ctypes.CDLL] = None - _lib_type = None + _lib_type: Optional[type[ctypes.CDLL]] = None + + framework_libpath: Optional[pathlib.Path] = None + toolkit_libpath: Optional[pathlib.Path] = None + agent_client_libpath: Optional[pathlib.Path] = None + agent_server_libpath: Optional[pathlib.Path] = None @classmethod - def open(cls, path: pathlib.Path, agent_server: bool = False): + def open(cls, path: pathlib.Path, agent_server: bool = False) -> None: """打开并加载库 / Open and load libraries Args: @@ -44,49 +49,46 @@ def open(cls, path: pathlib.Path, agent_server: bool = False): cls._is_agent_server = agent_server - if not cls.is_agent_server(): - framework_library = { - WINDOWS: "MaaFramework.dll", - MACOS: "libMaaFramework.dylib", - LINUX: "libMaaFramework.so", - } - agent_client_library = { - WINDOWS: "MaaAgentClient.dll", - MACOS: "libMaaAgentClient.dylib", - LINUX: "libMaaAgentClient.so", - } - else: - agent_server_library = { - WINDOWS: "MaaAgentServer.dll", - MACOS: "libMaaAgentServer.dylib", - LINUX: "libMaaAgentServer.so", - } - platform_type = platform.system().lower() - - if platform_type == WINDOWS: - cls._lib_type = ctypes.WinDLL - else: - cls._lib_type = ctypes.CDLL - - if cls._lib_type is None: - raise + cls._lib_type = ctypes.WinDLL if platform_type == WINDOWS else ctypes.CDLL if not cls.is_agent_server(): - toolkit_library = { - WINDOWS: "MaaToolkit.dll", - MACOS: "libMaaToolkit.dylib", - LINUX: "libMaaToolkit.so", - } - - cls.framework_libpath = path / framework_library[platform_type] - cls.agent_client_libpath = path / agent_client_library[platform_type] - cls.toolkit_libpath = path / toolkit_library[platform_type] + cls.framework_libpath = ( + path + / { + WINDOWS: "MaaFramework.dll", + MACOS: "libMaaFramework.dylib", + LINUX: "libMaaFramework.so", + }[platform_type] + ) + cls.agent_client_libpath = ( + path + / { + WINDOWS: "MaaAgentClient.dll", + MACOS: "libMaaAgentClient.dylib", + LINUX: "libMaaAgentClient.so", + }[platform_type] + ) + cls.toolkit_libpath = ( + path + / { + WINDOWS: "MaaToolkit.dll", + MACOS: "libMaaToolkit.dylib", + LINUX: "libMaaToolkit.so", + }[platform_type] + ) else: - cls.agent_server_libpath = path / agent_server_library[platform_type] + cls.agent_server_libpath = ( + path + / { + WINDOWS: "MaaAgentServer.dll", + MACOS: "libMaaAgentServer.dylib", + LINUX: "libMaaAgentServer.so", + }[platform_type] + ) @classmethod - def framework(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: + def framework(cls) -> ctypes.CDLL: """获取 MaaFramework 库 / Get MaaFramework library Returns: @@ -99,15 +101,12 @@ def framework(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: if not cls._framework: cls._framework = cls._lib_type(str(cls.framework_libpath)) - if cls._framework is None: - raise RuntimeError("Library._framework is None!") - return cls._framework else: return cls.agent_server() @classmethod - def toolkit(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: + def toolkit(cls) -> ctypes.CDLL: """获取 MaaToolkit 库 / Get MaaToolkit library Returns: @@ -128,7 +127,7 @@ def toolkit(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: return cls._toolkit @classmethod - def agent_client(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: + def agent_client(cls) -> ctypes.CDLL: """获取 MaaAgentClient 库 / Get MaaAgentClient library Returns: @@ -149,11 +148,12 @@ def agent_client(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: return cls._agent_client @classmethod - def agent_server(cls) -> Union["ctypes.CDLL", "ctypes.WinDLL"]: + def agent_server(cls) -> ctypes.CDLL: """获取 MaaAgentServer 库 / Get MaaAgentServer library Returns: - (ctypes.CDLL | ctypes.WinDLL): MaaAgentServer 动态库对象 / MaaAgentServer dynamic library object + (ctypes.CDLL | ctypes.WinDLL): MaaAgentServer 动态库对象 + MaaAgentServer dynamic library object Raises: ValueError: 如果不在 AgentServer 模式下调用 @@ -192,7 +192,7 @@ def version(cls) -> str: _api_properties_initialized: bool = False @classmethod - def _set_api_properties(cls): + def _set_api_properties(cls) -> None: if cls._api_properties_initialized: return diff --git a/source/binding/Python/maa/pipeline.py b/source/binding/Python/maa/pipeline.py index 3d49181aa0..caf17d8588 100644 --- a/source/binding/Python/maa/pipeline.py +++ b/source/binding/Python/maa/pipeline.py @@ -1,10 +1,11 @@ import json from dataclasses import dataclass, field -from typing import Any, List, Optional, Tuple, Union, Dict +from typing import Any, Optional, Union, cast + from strenum import StrEnum # Type aliases to match C++ std::variant types -JRect = Tuple[int, int, int, int] # std::array +JRect = tuple[int, int, int, int] # std::array JTarget = Union[bool, str, JRect] # std::variant @@ -55,10 +56,10 @@ class JDirectHit: @dataclass class JTemplateMatch: - template: List[str] # 必选 + template: list[str] # 必选 roi: JTarget = (0, 0, 0, 0) roi_offset: JRect = (0, 0, 0, 0) - threshold: List[float] = field(default_factory=lambda: [0.7]) + threshold: list[float] = field(default_factory=lambda: [0.7]) order_by: str = "Horizontal" index: int = 0 method: int = 5 @@ -67,7 +68,7 @@ class JTemplateMatch: @dataclass class JFeatureMatch: - template: List[str] # 必选 + template: list[str] # 必选 roi: JTarget = (0, 0, 0, 0) roi_offset: JRect = (0, 0, 0, 0) detector: str = "SIFT" @@ -80,8 +81,8 @@ class JFeatureMatch: @dataclass class JColorMatch: - lower: List[List[int]] # 必选 - upper: List[List[int]] # 必选 + lower: list[list[int]] # 必选 + upper: list[list[int]] # 必选 roi: JTarget = (0, 0, 0, 0) roi_offset: JRect = (0, 0, 0, 0) order_by: str = "Horizontal" @@ -93,11 +94,11 @@ class JColorMatch: @dataclass class JOCR: - expected: List[str] = field(default_factory=list) + expected: list[str] = field(default_factory=lambda: []) roi: JTarget = (0, 0, 0, 0) roi_offset: JRect = (0, 0, 0, 0) threshold: float = 0.3 - replace: List[List[str]] = field(default_factory=list) + replace: list[list[str]] = field(default_factory=lambda: []) order_by: str = "Horizontal" index: int = 0 only_rec: bool = False @@ -108,10 +109,10 @@ class JOCR: @dataclass class JNeuralNetworkClassify: model: str # 必选 - expected: List[int] = field(default_factory=list) + expected: list[int] = field(default_factory=lambda: []) roi: JTarget = (0, 0, 0, 0) roi_offset: JRect = (0, 0, 0, 0) - labels: List[str] = field(default_factory=list) + labels: list[str] = field(default_factory=lambda: []) order_by: str = "Horizontal" index: int = 0 @@ -119,11 +120,11 @@ class JNeuralNetworkClassify: @dataclass class JNeuralNetworkDetect: model: str # 必选 - expected: List[int] = field(default_factory=list) + expected: list[int] = field(default_factory=lambda: []) roi: JTarget = (0, 0, 0, 0) roi_offset: JRect = (0, 0, 0, 0) - labels: List[str] = field(default_factory=list) - threshold: List[float] = field(default_factory=lambda: [0.3]) + labels: list[str] = field(default_factory=lambda: []) + threshold: list[float] = field(default_factory=lambda: [0.3]) order_by: str = "Horizontal" index: int = 0 @@ -141,7 +142,7 @@ class JAnd: # all_of: List of sub-recognitions. Each element can be: # - str: node name reference (uses that node's recognition params) # - dict/object: inline recognition definition - all_of: List[Any] = field(default_factory=list) + all_of: list[Any] = field(default_factory=lambda: []) box_index: int = 0 @@ -150,7 +151,7 @@ class JOr: # any_of: List of sub-recognitions. Each element can be: # - str: node name reference (uses that node's recognition params) # - dict/object: inline recognition definition - any_of: List[Any] = field(default_factory=list) + any_of: list[Any] = field(default_factory=lambda: []) # Recognition parameter union type @@ -196,10 +197,10 @@ class JSwipe: starting: int = 0 # MultiSwipe 中使用 begin: JTarget = True begin_offset: JRect = (0, 0, 0, 0) - end: List[JTarget] = field(default_factory=lambda: [True]) - end_offset: List[JRect] = field(default_factory=lambda: [(0, 0, 0, 0)]) - end_hold: List[int] = field(default_factory=lambda: [0]) - duration: List[int] = field(default_factory=lambda: [200]) + end: list[JTarget] = field(default_factory=lambda: [True]) + end_offset: list[JRect] = field(default_factory=lambda: [(0, 0, 0, 0)]) + end_hold: list[int] = field(default_factory=lambda: [0]) + duration: list[int] = field(default_factory=lambda: [200]) only_hover: bool = False contact: int = 0 pressure: int = 1 @@ -207,7 +208,7 @@ class JSwipe: @dataclass class JMultiSwipe: - swipes: List[JSwipe] + swipes: list[JSwipe] @dataclass @@ -225,12 +226,12 @@ class JTouchUp: @dataclass class JClickKey: - key: List[int] + key: list[int] @dataclass class JLongPressKey: - key: List[int] # 必选 + key: list[int] # 必选 duration: int = 1000 @@ -270,7 +271,7 @@ class JScroll: @dataclass class JCommand: exec: str # 必选 - args: List[str] = field(default_factory=list) + args: list[str] = field(default_factory=lambda: []) detach: bool = False @@ -354,11 +355,11 @@ class JWaitFreezes: class JPipelineData: recognition: JRecognition # 必选 action: JAction # 必选 - next: List[JNodeAttr] = field(default_factory=list) + next: list[JNodeAttr] = field(default_factory=lambda: []) rate_limit: int = 1000 timeout: int = 20000 - on_error: List[JNodeAttr] = field(default_factory=list) - anchor: Dict[str, str] = field(default_factory=dict) + on_error: list[JNodeAttr] = field(default_factory=lambda: []) + anchor: dict[str, str] = field(default_factory=lambda: {}) inverse: bool = False enabled: bool = True pre_delay: int = 200 @@ -370,29 +371,29 @@ class JPipelineData: repeat_wait_freezes: Optional[JWaitFreezes] = None max_hit: int = 4294967295 # UINT_MAX focus: Any = None - attach: Dict = field(default_factory=dict) + attach: dict[str, Any] = field(default_factory=lambda: {}) class JPipelineParser: @staticmethod - def _parse_wait_freezes(data: dict) -> JWaitFreezes: + def _parse_wait_freezes(data: dict[str, Any]) -> JWaitFreezes: """Convert wait freezes with proper defaults""" return JWaitFreezes( - time=data.get("time"), - target=data.get("target"), # type: ignore - target_offset=data.get("target_offset"), # type: ignore - threshold=data.get("threshold"), - method=data.get("method"), - rate_limit=data.get("rate_limit"), - timeout=data.get("timeout"), + time=cast(int, data.get("time")), + target=cast(JTarget, data.get("target")), + target_offset=cast(JRect, data.get("target_offset")), + threshold=cast(float, data.get("threshold")), + method=cast(int, data.get("method")), + rate_limit=cast(int, data.get("rate_limit")), + timeout=cast(int, data.get("timeout")), ) @classmethod def _parse_param( cls, param_type: Union[JRecognitionType, JActionType], - param_data: dict, - param_type_map: dict, + param_data: dict[str, Any], + param_type_map: dict[Any, type[Any]], ) -> Union[JRecognitionParam, JActionParam]: """Generic function to parse parameters based on type map.""" param_class = param_type_map.get(param_type) @@ -402,15 +403,11 @@ def _parse_param( try: return param_class(**param_data) except TypeError as e: - print( - f"Warning: Failed to create {param_class.__name__} with data {param_data}: {e}" - ) + print(f"Warning: Failed to create {param_class.__name__} with data {param_data}: {e}") return param_class() @classmethod - def _parse_recognition_param( - cls, param_type: JRecognitionType, param_data: dict - ) -> JRecognitionParam: + def _parse_recognition_param(cls, param_type: JRecognitionType, param_data: dict[str, Any]) -> JRecognitionParam: """Convert dict to appropriate JRecognitionParam variant based on type.""" param_type_map = { JRecognitionType.DirectHit: JDirectHit, @@ -424,12 +421,10 @@ def _parse_recognition_param( JRecognitionType.Or: JOr, JRecognitionType.Custom: JCustomRecognition, } - return cls._parse_param(param_type, param_data, param_type_map) + return cast(JRecognitionParam, cls._parse_param(param_type, param_data, param_type_map)) @classmethod - def _parse_action_param( - cls, param_type: JActionType, param_data: dict - ) -> JActionParam: + def _parse_action_param(cls, param_type: JActionType, param_data: dict[str, Any]) -> JActionParam: """Convert dict to appropriate JActionParam variant based on type.""" param_type_map = { JActionType.DoNothing: JDoNothing, @@ -455,74 +450,68 @@ def _parse_action_param( JActionType.Custom: JCustomAction, } - return cls._parse_param(param_type, param_data, param_type_map) + return cast(JActionParam, cls._parse_param(param_type, param_data, param_type_map)) @classmethod - def parse_pipeline_data(cls, pipeline_data: Union[str, Dict]) -> JPipelineData: + def parse_pipeline_data(cls, pipeline_data: Union[str, dict[str, Any]]) -> JPipelineData: """Parse JSON string to JPipelineData dataclass with proper variant types.""" if isinstance(pipeline_data, dict): data = pipeline_data - elif isinstance(pipeline_data, str): + else: try: - data: dict = json.loads(pipeline_data) + data = cast(dict[str, Any], json.loads(pipeline_data)) except json.JSONDecodeError as e: - raise ValueError(f"Invalid JSON format: {e}") - else: - raise TypeError("Input must be a JSON string or a dict.") + raise ValueError(f"Invalid JSON format: {e}") from e # Convert recognition - recognition_data: dict = data.get("recognition") - recognition_type: JRecognitionType = JRecognitionType( - recognition_data.get("type") - ) - recognition_param_data: dict = recognition_data.get("param") - recognition_param = cls._parse_recognition_param( - recognition_type, recognition_param_data - ) + recognition_data = cast(dict[str, Any], data.get("recognition")) + recognition_type: JRecognitionType = JRecognitionType(recognition_data.get("type")) + recognition_param_data = cast(dict[str, Any], recognition_data.get("param")) + recognition_param = cls._parse_recognition_param(recognition_type, recognition_param_data) recognition = JRecognition(type=recognition_type, param=recognition_param) # Convert action - action_data: dict = data.get("action") + action_data = cast(dict[str, Any], data.get("action")) action_type: JActionType = JActionType(action_data.get("type")) - action_param_data = action_data.get("param") + action_param_data = cast(dict[str, Any], action_data.get("param")) action_param = cls._parse_action_param(action_type, action_param_data) action = JAction(type=action_type, param=action_param) - pre_wait_freezes = cls._parse_wait_freezes(data.get("pre_wait_freezes")) # type: ignore - post_wait_freezes = cls._parse_wait_freezes(data.get("post_wait_freezes")) # type: ignore - repeat_wait_freezes = cls._parse_wait_freezes(data.get("repeat_wait_freezes")) # type: ignore + pre_wait_freezes = cls._parse_wait_freezes(cast(dict[str, Any], data.get("pre_wait_freezes"))) + post_wait_freezes = cls._parse_wait_freezes(cast(dict[str, Any], data.get("post_wait_freezes"))) + repeat_wait_freezes = cls._parse_wait_freezes(cast(dict[str, Any], data.get("repeat_wait_freezes"))) # Create JPipelineData with converted data return JPipelineData( recognition=recognition, action=action, - next=cls._parse_node_attr_list(data.get("next")), - rate_limit=data.get("rate_limit"), - timeout=data.get("timeout"), - on_error=cls._parse_node_attr_list(data.get("on_error")), - anchor=data.get("anchor", {}), - inverse=data.get("inverse"), - enabled=data.get("enabled"), - pre_delay=data.get("pre_delay"), - post_delay=data.get("post_delay"), - pre_wait_freezes=pre_wait_freezes, # type: ignore - post_wait_freezes=post_wait_freezes, # type: ignore - repeat=data.get("repeat"), - repeat_delay=data.get("repeat_delay"), - repeat_wait_freezes=repeat_wait_freezes, # type: ignore - max_hit=data.get("max_hit"), + next=cls._parse_node_attr_list(cast(list[dict[str, Any]], data.get("next"))), + rate_limit=cast(int, data.get("rate_limit")), + timeout=cast(int, data.get("timeout")), + on_error=cls._parse_node_attr_list(cast(list[dict[str, Any]], data.get("on_error"))), + anchor=cast(dict[str, str], data.get("anchor", {})), + inverse=cast(bool, data.get("inverse")), + enabled=cast(bool, data.get("enabled")), + pre_delay=cast(int, data.get("pre_delay")), + post_delay=cast(int, data.get("post_delay")), + pre_wait_freezes=pre_wait_freezes, + post_wait_freezes=post_wait_freezes, + repeat=cast(int, data.get("repeat")), + repeat_delay=cast(int, data.get("repeat_delay")), + repeat_wait_freezes=repeat_wait_freezes, + max_hit=cast(int, data.get("max_hit")), focus=data.get("focus"), - attach=data.get("attach"), + attach=cast(dict[str, Any], data.get("attach")), ) @staticmethod - def _parse_node_attr_list(data: List[dict]) -> List[JNodeAttr]: + def _parse_node_attr_list(data: list[dict[str, Any]]) -> list[JNodeAttr]: """Convert list of dicts to list of JNodeAttr.""" return [ JNodeAttr( - name=item.get("name"), - jump_back=item.get("jump_back", False), - anchor=item.get("anchor", False), + name=cast(str, item.get("name")), + jump_back=cast(bool, item.get("jump_back", False)), + anchor=cast(bool, item.get("anchor", False)), ) for item in data ] diff --git a/source/binding/Python/maa/resource.py b/source/binding/Python/maa/resource.py index 265b127da5..bc8418d71b 100644 --- a/source/binding/Python/maa/resource.py +++ b/source/binding/Python/maa/resource.py @@ -1,25 +1,29 @@ import ctypes -import pathlib import json -from typing import Optional, Union, List, Dict -from dataclasses import dataclass, field +import pathlib +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, Callable, Optional, Union import numpy -from .event_sink import EventSink, NotificationType +from .buffer import ImageBuffer, StringBuffer, StringListBuffer from .define import * +from .event_sink import EventSink, NotificationType from .job import Job from .library import Library -from .buffer import StringBuffer, StringListBuffer, ImageBuffer from .pipeline import ( + JActionParam, + JActionType, JPipelineData, JPipelineParser, - JRecognitionType, JRecognitionParam, - JActionType, - JActionParam, + JRecognitionType, ) +if TYPE_CHECKING: + from .custom_action import CustomAction + from .custom_recognition import CustomRecognition + class Resource: _handle: MaaResourceHandle @@ -52,8 +56,8 @@ def __init__( if not self._handle: raise RuntimeError("Failed to create resource.") - self._custom_action_holder = {} - self._custom_recognition_holder = {} + self._custom_action_holder: dict[str, CustomAction] = {} + self._custom_recognition_holder: dict[str, CustomRecognition] = {} def __del__(self): if self._handle and self._own: @@ -71,9 +75,7 @@ def post_bundle(self, path: Union[pathlib.Path, str]) -> Job: Returns: Job: 作业对象,可通过 status/wait 查询状态 / Job object, can query status via status/wait """ - res_id = Library.framework().MaaResourcePostBundle( - self._handle, str(path).encode() - ) + res_id = Library.framework().MaaResourcePostBundle(self._handle, str(path).encode()) return Job(res_id, self._status, self._wait) def post_ocr_model(self, path: Union[pathlib.Path, str]) -> Job: @@ -88,9 +90,7 @@ def post_ocr_model(self, path: Union[pathlib.Path, str]) -> Job: Returns: Job: 作业对象,可通过 status/wait 查询状态 / Job object, can query status via status/wait """ - res_id = Library.framework().MaaResourcePostOcrModel( - self._handle, str(path).encode() - ) + res_id = Library.framework().MaaResourcePostOcrModel(self._handle, str(path).encode()) return Job(res_id, self._status, self._wait) def post_pipeline(self, path: Union[pathlib.Path, str]) -> Job: @@ -108,9 +108,7 @@ def post_pipeline(self, path: Union[pathlib.Path, str]) -> Job: Returns: Job: 作业对象,可通过 status/wait 查询状态 / Job object, can query status via status/wait """ - res_id = Library.framework().MaaResourcePostPipeline( - self._handle, str(path).encode() - ) + res_id = Library.framework().MaaResourcePostPipeline(self._handle, str(path).encode()) return Job(res_id, self._status, self._wait) def post_image(self, path: Union[pathlib.Path, str]) -> Job: @@ -128,12 +126,10 @@ def post_image(self, path: Union[pathlib.Path, str]) -> Job: Returns: Job: 作业对象,可通过 status/wait 查询状态 / Job object, can query status via status/wait """ - res_id = Library.framework().MaaResourcePostImage( - self._handle, str(path).encode() - ) + res_id = Library.framework().MaaResourcePostImage(self._handle, str(path).encode()) return Job(res_id, self._status, self._wait) - def override_pipeline(self, pipeline_override: Dict) -> bool: + def override_pipeline(self, pipeline_override: dict[str, Any]) -> bool: """覆盖 pipeline / Override pipeline_override Args: @@ -151,7 +147,7 @@ def override_pipeline(self, pipeline_override: Dict) -> bool: ) ) - def override_next(self, name: str, next_list: List[str]) -> bool: + def override_next(self, name: str, next_list: list[str]) -> bool: """覆盖任务的 next 列表 / Override the next list of task 注意:此方法会直接设置 next 列表,即使节点不存在也会创建 @@ -167,11 +163,7 @@ def override_next(self, name: str, next_list: List[str]) -> bool: list_buffer = StringListBuffer() list_buffer.set(next_list) - return bool( - Library.framework().MaaResourceOverrideNext( - self._handle, name.encode(), list_buffer._handle - ) - ) + return bool(Library.framework().MaaResourceOverrideNext(self._handle, name.encode(), list_buffer._handle)) def override_image(self, image_name: str, image: numpy.ndarray) -> bool: """覆盖图片 / Override the image corresponding to image_name @@ -187,12 +179,10 @@ def override_image(self, image_name: str, image: numpy.ndarray) -> bool: image_buffer.set(image) return bool( - Library.framework().MaaResourceOverrideImage( - self._handle, image_name.encode(), image_buffer._handle - ) + Library.framework().MaaResourceOverrideImage(self._handle, image_name.encode(), image_buffer._handle) ) - def get_node_data(self, name: str) -> Optional[Dict]: + def get_node_data(self, name: str) -> Optional[dict[str, Any]]: """获取任务当前的定义 / Get the current definition of task Args: @@ -202,9 +192,7 @@ def get_node_data(self, name: str) -> Optional[Dict]: Optional[Dict]: 任务定义字典,如果不存在则返回 None / Task definition dict, or None if not exists """ string_buffer = StringBuffer() - if not Library.framework().MaaResourceGetNodeData( - self._handle, name.encode(), string_buffer._handle - ): + if not Library.framework().MaaResourceGetNodeData(self._handle, name.encode(), string_buffer._handle): return None data = string_buffer.get() if not data: @@ -222,7 +210,8 @@ def get_node_object(self, name: str) -> Optional[JPipelineData]: name: 任务名 / Task name Returns: - Optional[JPipelineData]: 任务定义对象,如果不存在则返回 None / Task definition object, or None if not exists + Optional[JPipelineData]: 任务定义对象,如果不存在则返回 None + Task definition object, or None if not exists """ node_data = self.get_node_data(name) @@ -231,16 +220,15 @@ def get_node_object(self, name: str) -> Optional[JPipelineData]: return JPipelineParser.parse_pipeline_data(node_data) - def get_default_recognition_param( - self, reco_type: JRecognitionType - ) -> Optional[JRecognitionParam]: + def get_default_recognition_param(self, reco_type: JRecognitionType) -> Optional[JRecognitionParam]: """获取指定识别类型的默认参数 / Get default parameters for specified recognition type Args: reco_type: 识别类型 / Recognition type Returns: - Optional[JRecognitionParam]: 默认参数对象,如果不存在则返回 None / Default parameter object, or None if not exists + Optional[JRecognitionParam]: 默认参数对象,如果不存在则返回 None + Default parameter object, or None if not exists """ string_buffer = StringBuffer() if not Library.framework().MaaResourceGetDefaultRecognitionParam( @@ -258,16 +246,15 @@ def get_default_recognition_param( except (json.JSONDecodeError, ValueError): return None - def get_default_action_param( - self, action_type: JActionType - ) -> Optional[JActionParam]: + def get_default_action_param(self, action_type: JActionType) -> Optional[JActionParam]: """获取指定动作类型的默认参数 / Get default parameters for specified action type Args: action_type: 动作类型 / Action type Returns: - Optional[JActionParam]: 默认参数对象,如果不存在则返回 None / Default parameter object, or None if not exists + Optional[JActionParam]: 默认参数对象,如果不存在则返回 None + Default parameter object, or None if not exists """ string_buffer = StringBuffer() if not Library.framework().MaaResourceGetDefaultActionParam( @@ -311,9 +298,7 @@ def use_cpu(self) -> bool: Returns: bool: 是否成功 / Whether successful """ - return self.set_inference( - MaaInferenceExecutionProviderEnum.CPU, MaaInferenceDeviceEnum.CPU - ) + return self.set_inference(MaaInferenceExecutionProviderEnum.CPU, MaaInferenceDeviceEnum.CPU) def use_directml(self, device_id: int = MaaInferenceDeviceEnum.Auto) -> bool: """使用 DirectML 进行推理 / Use DirectML for inference @@ -343,9 +328,7 @@ def use_auto_ep(self) -> bool: Returns: bool: 是否成功 / Whether successful """ - return self.set_inference( - MaaInferenceExecutionProviderEnum.Auto, MaaInferenceDeviceEnum.Auto - ) + return self.set_inference(MaaInferenceExecutionProviderEnum.Auto, MaaInferenceDeviceEnum.Auto) # not implemented # def use_cuda(self, nvidia_gpu_id: int) -> bool: @@ -371,24 +354,29 @@ def set_auto_device(self) -> bool: """ return self.use_auto_ep() - def custom_recognition(self, name: str): + def custom_recognition(self, name: str) -> Callable[[type["CustomRecognition"]], type["CustomRecognition"]]: """自定义识别器装饰器 / Custom recognition decorator Args: - name: 识别器名称,需与 Pipeline 中的 custom_recognition 字段匹配 / Recognition name, should match the custom_recognition field in Pipeline + name: 识别器名称,需与 Pipeline 中的 custom_recognition 字段匹配 + Recognition name, should match the custom_recognition field in Pipeline Returns: 装饰器函数 / Decorator function """ - def wrapper_recognition(recognition): + def wrapper_recognition( + recognition: type["CustomRecognition"], + ) -> type["CustomRecognition"]: self.register_custom_recognition(name=name, recognition=recognition()) return recognition return wrapper_recognition def register_custom_recognition( - self, name: str, recognition: "CustomRecognition" # type: ignore + self, + name: str, + recognition: "CustomRecognition", ) -> bool: """注册自定义识别器 / Register a custom recognizer @@ -443,23 +431,24 @@ def clear_custom_recognition(self) -> bool: ) ) - def custom_action(self, name: str): + def custom_action(self, name: str) -> Callable[[type["CustomAction"]], type["CustomAction"]]: """自定义动作装饰器 / Custom action decorator Args: - name: 动作名称,需与 Pipeline 中的 custom_action 字段匹配 / Action name, should match the custom_action field in Pipeline + name: 动作名称,需与 Pipeline 中的 custom_action 字段匹配 + Action name, should match the custom_action field in Pipeline Returns: 装饰器函数 / Decorator function """ - def wrapper_action(action): + def wrapper_action(action: type["CustomAction"]) -> type["CustomAction"]: self.register_custom_action(name=name, action=action()) return action return wrapper_action - def register_custom_action(self, name: str, action: "CustomAction") -> bool: # type: ignore + def register_custom_action(self, name: str, action: "CustomAction") -> bool: """注册自定义操作 / Register a custom action Args: @@ -539,9 +528,7 @@ def custom_recognition_list(self) -> list[str]: RuntimeError: 如果获取失败 """ buffer = StringListBuffer() - if not Library.framework().MaaResourceGetCustomRecognitionList( - self._handle, buffer._handle - ): + if not Library.framework().MaaResourceGetCustomRecognitionList(self._handle, buffer._handle): raise RuntimeError("Failed to get custom recognition list.") return buffer.get() @@ -556,9 +543,7 @@ def custom_action_list(self) -> list[str]: RuntimeError: 如果获取失败 """ buffer = StringListBuffer() - if not Library.framework().MaaResourceGetCustomActionList( - self._handle, buffer._handle - ): + if not Library.framework().MaaResourceGetCustomActionList(self._handle, buffer._handle): raise RuntimeError("Failed to get custom action list.") return buffer.get() @@ -577,7 +562,7 @@ def hash(self) -> str: raise RuntimeError("Failed to get hash.") return buffer.get() - _sink_holder: Dict[int, "ResourceEventSink"] = {} + _sink_holder: dict[int, "ResourceEventSink"] = {} def add_sink(self, sink: "ResourceEventSink") -> Optional[int]: """添加资源事件监听器 / Add resource event listener @@ -588,11 +573,7 @@ def add_sink(self, sink: "ResourceEventSink") -> Optional[int]: Returns: Optional[int]: 监听器 id,失败返回 None / Listener id, or None if failed """ - sink_id = int( - Library.framework().MaaResourceAddSink( - self._handle, *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.framework().MaaResourceAddSink(self._handle, *EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None @@ -824,7 +805,6 @@ def _set_api_properties(): class ResourceEventSink(EventSink): - @dataclass class ResourceLoadingDetail: res_id: int @@ -840,11 +820,10 @@ def on_resource_loading( ): pass - def on_raw_notification(self, resource: Resource, msg: str, details: dict): + def on_raw_notification(self, resource: Resource, msg: str, details: dict[str, Any]) -> None: pass - def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict): - + def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict[str, Any]) -> None: resource = Resource(handle=handle) self.on_raw_notification(resource, msg, details) diff --git a/source/binding/Python/maa/tasker.py b/source/binding/Python/maa/tasker.py index 42ec7119dd..b4753a7761 100644 --- a/source/binding/Python/maa/tasker.py +++ b/source/binding/Python/maa/tasker.py @@ -1,19 +1,23 @@ import ctypes import dataclasses import json +from dataclasses import dataclass from pathlib import Path -from typing import Dict, Optional, Union +from typing import TYPE_CHECKING, Any, Optional, Union import numpy +from .buffer import ImageBuffer, ImageListBuffer, RectBuffer, StringBuffer +from .controller import Controller from .define import * -from .library import Library -from .buffer import ImageListBuffer, RectBuffer, StringBuffer, ImageBuffer -from .job import Job, JobWithResult, TaskJob from .event_sink import EventSink, NotificationType +from .job import Job, TaskJob +from .library import Library +from .pipeline import JActionParam, JActionType, JRecognitionParam, JRecognitionType from .resource import Resource -from .controller import Controller -from .pipeline import JRecognitionParam, JActionParam, JRecognitionType, JActionType + +if TYPE_CHECKING: + from .context import ContextEventSink class Tasker: @@ -65,12 +69,8 @@ def bind(self, resource: Resource, controller: Controller) -> bool: self._resource_holder = resource self._controller_holder = controller - return bool( - Library.framework().MaaTaskerBindResource(self._handle, resource._handle) - ) and bool( - Library.framework().MaaTaskerBindController( - self._handle, controller._handle - ) + return bool(Library.framework().MaaTaskerBindResource(self._handle, resource._handle)) and bool( + Library.framework().MaaTaskerBindController(self._handle, controller._handle) ) @property @@ -114,7 +114,7 @@ def inited(self) -> bool: """ return bool(Library.framework().MaaTaskerInited(self._handle)) - def post_task(self, entry: str, pipeline_override: Dict = {}) -> TaskJob: + def post_task(self, entry: str, pipeline_override: Optional[dict[str, Any]] = None) -> TaskJob: """异步执行任务 / Asynchronously execute task 这是一个异步操作,会立即返回一个 TaskJob 对象 @@ -125,9 +125,14 @@ def post_task(self, entry: str, pipeline_override: Dict = {}) -> TaskJob: pipeline_override: 用于覆盖的 json / JSON for overriding Returns: - TaskJob: 任务作业对象,可通过 status/wait 查询状态,通过 get() 获取结果,通过 override_pipeline() 动态修改 pipeline - / Task job object, can query status via status/wait, get result via get(), modify pipeline via override_pipeline() + TaskJob: 任务作业对象,可通过 status/wait 查询状态,通过 get() 获取结果,通过 override_pipeline() 动态修改 + pipeline + + Task job object, can query status via status/wait, get result via get(), modify + pipeline via override_pipeline() """ + if pipeline_override is None: + pipeline_override = {} taskid = Library.framework().MaaTaskerPostTask( self._handle, *Tasker._gen_post_param(entry, pipeline_override), @@ -165,7 +170,7 @@ def post_action( self, action_type: JActionType, action_param: JActionParam, - box: Rect = Rect(0, 0, 0, 0), + box: RectType = (0, 0, 0, 0), reco_detail: str = "", ) -> TaskJob: """异步执行操作 / Asynchronously execute action @@ -181,9 +186,7 @@ def post_action( """ rect_buffer = RectBuffer() rect_buffer.set(box) - action_param_json = json.dumps( - dataclasses.asdict(action_param), ensure_ascii=False - ) + action_param_json = json.dumps(dataclasses.asdict(action_param), ensure_ascii=False) taskid = Library.framework().MaaTaskerPostAction( self._handle, action_type.encode(), @@ -208,7 +211,8 @@ def post_stop(self) -> Job: 这是一个异步操作,会立即返回一个 Job 对象 停止操作会中断当前运行的任务,并停止资源加载和控制器操作 This is an asynchronous operation that immediately returns a Job object - The stop operation will interrupt the currently running task and stop resource loading and controller operations + The stop operation will interrupt the currently running task and stop resource loading and + controller operations Returns: Job: 作业对象,可通过 status/wait 查询状态 / Job object, can query status via status/wait @@ -255,7 +259,7 @@ def clear_cache(self) -> bool: """ return bool(Library.framework().MaaTaskerClearCache(self._handle)) - def override_pipeline(self, task_id: int, pipeline_override: Dict) -> bool: + def override_pipeline(self, task_id: int, pipeline_override: dict[str, Any]) -> bool: """覆盖指定任务的 pipeline / Override pipeline for specified task 在任务执行期间动态修改 pipeline 配置 @@ -273,7 +277,7 @@ def override_pipeline(self, task_id: int, pipeline_override: Dict) -> bool: json.dumps(pipeline_override, ensure_ascii=False).encode(), ) - _sink_holder: Dict[int, "EventSink"] = {} + _sink_holder: dict[int, "EventSink"] = {} def add_sink(self, sink: "TaskerEventSink") -> Optional[int]: """添加实例事件监听器 / Add instance event listener @@ -284,11 +288,7 @@ def add_sink(self, sink: "TaskerEventSink") -> Optional[int]: Returns: Optional[int]: 监听器 id,失败返回 None / Listener id, or None if failed """ - sink_id = int( - Library.framework().MaaTaskerAddSink( - self._handle, *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.framework().MaaTaskerAddSink(self._handle, *EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None @@ -317,11 +317,7 @@ def add_context_sink(self, sink: "ContextEventSink") -> Optional[int]: Returns: Optional[int]: 监听器 id,失败返回 None / Listener id, or None if failed """ - sink_id = int( - Library.framework().MaaTaskerAddContextSink( - self._handle, *EventSink._gen_c_param(sink) - ) - ) + sink_id = int(Library.framework().MaaTaskerAddContextSink(self._handle, *EventSink._gen_c_param(sink))) if sink_id == MaaInvalidId: return None @@ -344,7 +340,7 @@ def clear_context_sinks(self) -> None: ### private ### @staticmethod - def _gen_post_param(entry: str, pipeline_override: Dict) -> Tuple[bytes, bytes]: + def _gen_post_param(entry: str, pipeline_override: dict[str, Any]) -> tuple[bytes, bytes]: pipeline_json = json.dumps(pipeline_override, ensure_ascii=False) return ( @@ -383,7 +379,8 @@ def get_recognition_detail(self, reco_id: int) -> Optional[RecognitionDetail]: reco_id: 识别号 / Recognition id Returns: - Optional[RecognitionDetail]: 识别详情,如果不存在则返回 None / Recognition detail, or None if not exists + Optional[RecognitionDetail]: 识别详情,如果不存在则返回 None + Recognition detail, or None if not exists """ name = StringBuffer() algorithm = StringBuffer() # type: ignore @@ -567,16 +564,8 @@ def get_node_detail(self, node_id: int) -> Optional[NodeDetail]: if not ret: return None - recognition = ( - self.get_recognition_detail(int(c_reco_id.value)) - if c_reco_id.value != 0 - else None - ) - action = ( - self.get_action_detail(int(c_action_id.value)) - if c_action_id.value != 0 - else None - ) + recognition = self.get_recognition_detail(int(c_reco_id.value)) if c_reco_id.value != 0 else None + action = self.get_action_detail(int(c_action_id.value)) if c_action_id.value != 0 else None return NodeDetail( node_id=node_id, @@ -706,7 +695,8 @@ def set_debug_mode(debug_mode: bool) -> bool: """设置是否启用调试模式 / Set whether to enable debug mode 调试模式下, RecoDetail 将可以获取到 raw/draws; 所有任务都会被视为 focus 而产生回调 - In debug mode, RecoDetail can retrieve raw/draws; all tasks are treated as focus and produce callbacks + In debug mode, RecoDetail can retrieve raw/draws; all tasks are treated as focus and produce + callbacks Args: debug_mode: 是否启用调试模式 / Whether to enable debug mode @@ -725,7 +715,8 @@ def set_debug_mode(debug_mode: bool) -> bool: @staticmethod def set_save_on_error(save_on_error: bool) -> bool: - """设置是否在错误时保存截图到日志路径/on_error中 / Set whether to save screenshot on error to log path/on_error + """设置是否在错误时保存截图到日志路径/on_error中 + Set whether to save screenshot on error to log path/on_error Args: save_on_error: 是否保存 / Whether to save @@ -785,7 +776,8 @@ def load_plugin(path: Union[Path, str]) -> bool: """加载插件 / Load plugin 可以使用完整路径或仅使用名称, 仅使用名称时会在系统目录和当前目录中搜索. 也可以递归搜索目录中的插件 - Can use full path or name only. When using name only, will search in system directory and current directory. Can also recursively search for plugins in a directory + Can use full path or name only. When using name only, will search in system directory and + current directory. Can also recursively search for plugins in a directory Args: path: 插件库路径或名称 / Plugin library path or name @@ -802,7 +794,9 @@ def load_plugin(path: Union[Path, str]) -> bool: _api_properties_initialized: bool = False - def _parse_recognition_raw_detail(self, algorithm: str, raw_detail): + def _parse_recognition_raw_detail( + self, algorithm: str, raw_detail: Any + ) -> tuple[list[RecognitionResult], list[RecognitionResult], Optional[RecognitionResult]]: if not raw_detail: return [], [], None @@ -811,13 +805,15 @@ def _parse_recognition_raw_detail(self, algorithm: str, raw_detail): except ValueError: return [], [], None - ResultType = AlgorithmResultDict.get(algorithm_enum) - if not ResultType: + # The result class is selected dynamically from a dict of classes; + # construction is reflective over JSON, so treat it as Any. + result_cls: Any = AlgorithmResultDict.get(algorithm_enum) + if not result_cls: return [], [], None # And/Or 的 detail 是子识别结果数组,递归获取完整的 RecognitionDetail if algorithm_enum in (AlgorithmEnum.And, AlgorithmEnum.Or): - sub_results = [] + sub_results: list[RecognitionDetail] = [] for sub in raw_detail: reco_id = sub.get("reco_id") if not reco_id: @@ -825,11 +821,11 @@ def _parse_recognition_raw_detail(self, algorithm: str, raw_detail): sub_detail = self.get_recognition_detail(reco_id) if sub_detail: sub_results.append(sub_detail) - result = ResultType(sub_results=sub_results) + result = result_cls(sub_results=sub_results) return [result], [result], result - all_results: List[RecognitionResult] = [] - filtered_results: List[RecognitionResult] = [] + all_results: list[RecognitionResult] = [] + filtered_results: list[RecognitionResult] = [] best_result: Optional[RecognitionResult] = None raw_all_results = raw_detail.get("all", []) @@ -837,18 +833,16 @@ def _parse_recognition_raw_detail(self, algorithm: str, raw_detail): raw_best_result = raw_detail.get("best", None) for raw_result in raw_all_results: - all_results.append(ResultType(**raw_result)) + all_results.append(result_cls(**raw_result)) for raw_result in raw_filtered_results: - filtered_results.append(ResultType(**raw_result)) + filtered_results.append(result_cls(**raw_result)) if raw_best_result: - best_result = ResultType(**raw_best_result) + best_result = result_cls(**raw_best_result) return all_results, filtered_results, best_result @staticmethod - def _parse_action_raw_detail( - action: str, raw_detail: Dict - ) -> Optional[ActionResult]: + def _parse_action_raw_detail(action: str, raw_detail: Any) -> Optional[ActionResult]: if not raw_detail: return None @@ -857,14 +851,14 @@ def _parse_action_raw_detail( except ValueError: return None - ResultType = ActionResultDict.get(action_enum) - if not ResultType: + result_cls: Any = ActionResultDict.get(action_enum) + if not result_cls: return None try: # cv::Point 在 JSON 中是数组 [x, y],不需要转换 # 直接使用 raw_detail 创建结果对象 - return ResultType(**raw_detail) + return result_cls(**raw_detail) except (TypeError, KeyError): # 如果解析失败,返回 None return None @@ -1069,7 +1063,6 @@ def _set_api_properties(): class TaskerEventSink(EventSink): - @dataclass class TaskerTaskDetail: task_id: int @@ -1077,16 +1070,13 @@ class TaskerTaskDetail: uuid: str hash: str - def on_tasker_task( - self, tasker: Tasker, noti_type: NotificationType, detail: TaskerTaskDetail - ): + def on_tasker_task(self, tasker: Tasker, noti_type: NotificationType, detail: TaskerTaskDetail) -> None: pass - def on_raw_notification(self, tasker: Tasker, msg: str, details: dict): + def on_raw_notification(self, tasker: Tasker, msg: str, details: dict[str, Any]) -> None: pass - def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict): - + def _on_raw_notification(self, handle: ctypes.c_void_p, msg: str, details: dict[str, Any]) -> None: tasker = Tasker(handle=handle) self.on_raw_notification(tasker, msg, details) diff --git a/source/binding/Python/maa/toolkit.py b/source/binding/Python/maa/toolkit.py index f7b64fae33..c4e4061d98 100644 --- a/source/binding/Python/maa/toolkit.py +++ b/source/binding/Python/maa/toolkit.py @@ -2,7 +2,7 @@ import json from dataclasses import dataclass from pathlib import Path -from typing import Dict, List, Union, Optional, Any +from typing import Any, Optional, Union from .define import * from .library import Library @@ -29,7 +29,7 @@ class AdbDevice: address: str screencap_methods: int input_methods: int - config: Dict[str, Any] + config: dict[str, Any] @dataclass @@ -60,7 +60,7 @@ class Toolkit: ### public ### @staticmethod - def init_option(user_path: Union[str, Path], default_config: Dict = {}) -> bool: + def init_option(user_path: Union[str, Path], default_config: Optional[dict[str, Any]] = None) -> bool: """从 user_path 中加载全局配置 / Load global config from user_path Args: @@ -70,13 +70,13 @@ def init_option(user_path: Union[str, Path], default_config: Dict = {}) -> bool: Returns: bool: 是否成功 / Whether successful """ + if default_config is None: + default_config = {} if Library.is_agent_server(): from .tasker import Tasker - print( - "Warning: Toolkit.init_option is deprecated in AgentServer; only set_log_dir is applied." - ) - config = default_config if isinstance(default_config, dict) else {} + print("Warning: Toolkit.init_option is deprecated in AgentServer; only set_log_dir is applied.") + config = default_config log_dir = Path(user_path) / "debug" if config.get("logging", True) else "" return Tasker.set_log_dir(log_dir) @@ -90,9 +90,7 @@ def init_option(user_path: Union[str, Path], default_config: Dict = {}) -> bool: ) @staticmethod - def find_adb_devices( - specified_adb: Optional[Union[str, Path]] = None - ) -> List[AdbDevice]: + def find_adb_devices(specified_adb: Optional[Union[str, Path]] = None) -> list[AdbDevice]: """搜索所有已知安卓模拟器 / Search all known Android emulators Args: @@ -106,47 +104,31 @@ def find_adb_devices( list_handle = Library.toolkit().MaaToolkitAdbDeviceListCreate() if specified_adb: - Library.toolkit().MaaToolkitAdbDeviceFindSpecified( - str(specified_adb).encode(), list_handle - ) + Library.toolkit().MaaToolkitAdbDeviceFindSpecified(str(specified_adb).encode(), list_handle) else: Library.toolkit().MaaToolkitAdbDeviceFind(list_handle) count = Library.toolkit().MaaToolkitAdbDeviceListSize(list_handle) - devices = [] + devices: list[AdbDevice] = [] for i in range(count): device_handle = Library.toolkit().MaaToolkitAdbDeviceListAt(list_handle, i) name = Library.toolkit().MaaToolkitAdbDeviceGetName(device_handle).decode() - adb_path = Path( - Library.toolkit().MaaToolkitAdbDeviceGetAdbPath(device_handle).decode() - ) - address = ( - Library.toolkit().MaaToolkitAdbDeviceGetAddress(device_handle).decode() - ) - screencap_methods = int( - Library.toolkit().MaaToolkitAdbDeviceGetScreencapMethods(device_handle) - ) - input_methods = int( - Library.toolkit().MaaToolkitAdbDeviceGetInputMethods(device_handle) - ) - config = json.loads( - Library.toolkit().MaaToolkitAdbDeviceGetConfig(device_handle).decode() - ) + adb_path = Path(Library.toolkit().MaaToolkitAdbDeviceGetAdbPath(device_handle).decode()) + address = Library.toolkit().MaaToolkitAdbDeviceGetAddress(device_handle).decode() + screencap_methods = int(Library.toolkit().MaaToolkitAdbDeviceGetScreencapMethods(device_handle)) + input_methods = int(Library.toolkit().MaaToolkitAdbDeviceGetInputMethods(device_handle)) + config = json.loads(Library.toolkit().MaaToolkitAdbDeviceGetConfig(device_handle).decode()) - devices.append( - AdbDevice( - name, adb_path, address, screencap_methods, input_methods, config - ) - ) + devices.append(AdbDevice(name, adb_path, address, screencap_methods, input_methods, config)) Library.toolkit().MaaToolkitAdbDeviceListDestroy(list_handle) return devices @staticmethod - def find_desktop_windows() -> List[DesktopWindow]: + def find_desktop_windows() -> list[DesktopWindow]: """查询所有窗口信息 / Query all window info Returns: @@ -160,22 +142,12 @@ def find_desktop_windows() -> List[DesktopWindow]: count = Library.toolkit().MaaToolkitDesktopWindowListSize(list_handle) - windows = [] + windows: list[DesktopWindow] = [] for i in range(count): - window_handle = Library.toolkit().MaaToolkitDesktopWindowListAt( - list_handle, i - ) + window_handle = Library.toolkit().MaaToolkitDesktopWindowListAt(list_handle, i) hwnd = Library.toolkit().MaaToolkitDesktopWindowGetHandle(window_handle) - class_name = ( - Library.toolkit() - .MaaToolkitDesktopWindowGetClassName(window_handle) - .decode() - ) - window_name = ( - Library.toolkit() - .MaaToolkitDesktopWindowGetWindowName(window_handle) - .decode() - ) + class_name = Library.toolkit().MaaToolkitDesktopWindowGetClassName(window_handle).decode() + window_name = Library.toolkit().MaaToolkitDesktopWindowGetWindowName(window_handle).decode() windows.append(DesktopWindow(hwnd, class_name, window_name)) @@ -204,7 +176,8 @@ def macos_request_permission(perm: MaaMacOSPermissionEnum) -> bool: """请求 macOS 权限 / Request macOS permission 向用户请求指定的 macOS 系统权限。系统可能会弹出授权对话框。 - Request the specified macOS system permission from user. System may show an authorization dialog. + Request the specified macOS system permission from user. System may show an authorization + dialog. Args: perm: 权限类型 / Permission type @@ -249,20 +222,14 @@ def _set_api_properties(): ctypes.c_char_p, ] - Library.toolkit().MaaToolkitAdbDeviceListCreate.restype = ( - MaaToolkitAdbDeviceListHandle - ) + Library.toolkit().MaaToolkitAdbDeviceListCreate.restype = MaaToolkitAdbDeviceListHandle Library.toolkit().MaaToolkitAdbDeviceListCreate.argtypes = [] Library.toolkit().MaaToolkitAdbDeviceListDestroy.restype = None - Library.toolkit().MaaToolkitAdbDeviceListDestroy.argtypes = [ - MaaToolkitAdbDeviceListHandle - ] + Library.toolkit().MaaToolkitAdbDeviceListDestroy.argtypes = [MaaToolkitAdbDeviceListHandle] Library.toolkit().MaaToolkitAdbDeviceFind.restype = MaaBool - Library.toolkit().MaaToolkitAdbDeviceFind.argtypes = [ - MaaToolkitAdbDeviceListHandle - ] + Library.toolkit().MaaToolkitAdbDeviceFind.argtypes = [MaaToolkitAdbDeviceListHandle] Library.toolkit().MaaToolkitAdbDeviceFindSpecified.restype = MaaBool Library.toolkit().MaaToolkitAdbDeviceFindSpecified.argtypes = [ @@ -271,9 +238,7 @@ def _set_api_properties(): ] Library.toolkit().MaaToolkitAdbDeviceListSize.restype = MaaSize - Library.toolkit().MaaToolkitAdbDeviceListSize.argtypes = [ - MaaToolkitAdbDeviceListHandle - ] + Library.toolkit().MaaToolkitAdbDeviceListSize.argtypes = [MaaToolkitAdbDeviceListHandle] Library.toolkit().MaaToolkitAdbDeviceListAt.restype = MaaToolkitAdbDeviceHandle Library.toolkit().MaaToolkitAdbDeviceListAt.argtypes = [ @@ -282,91 +247,55 @@ def _set_api_properties(): ] Library.toolkit().MaaToolkitAdbDeviceGetName.restype = ctypes.c_char_p - Library.toolkit().MaaToolkitAdbDeviceGetName.argtypes = [ - MaaToolkitAdbDeviceHandle - ] + Library.toolkit().MaaToolkitAdbDeviceGetName.argtypes = [MaaToolkitAdbDeviceHandle] Library.toolkit().MaaToolkitAdbDeviceGetAdbPath.restype = ctypes.c_char_p - Library.toolkit().MaaToolkitAdbDeviceGetAdbPath.argtypes = [ - MaaToolkitAdbDeviceHandle - ] + Library.toolkit().MaaToolkitAdbDeviceGetAdbPath.argtypes = [MaaToolkitAdbDeviceHandle] Library.toolkit().MaaToolkitAdbDeviceGetAddress.restype = ctypes.c_char_p - Library.toolkit().MaaToolkitAdbDeviceGetAddress.argtypes = [ - MaaToolkitAdbDeviceHandle - ] + Library.toolkit().MaaToolkitAdbDeviceGetAddress.argtypes = [MaaToolkitAdbDeviceHandle] - Library.toolkit().MaaToolkitAdbDeviceGetScreencapMethods.restype = ( - MaaAdbScreencapMethod - ) - Library.toolkit().MaaToolkitAdbDeviceGetScreencapMethods.argtypes = [ - MaaToolkitAdbDeviceHandle - ] + Library.toolkit().MaaToolkitAdbDeviceGetScreencapMethods.restype = MaaAdbScreencapMethod + Library.toolkit().MaaToolkitAdbDeviceGetScreencapMethods.argtypes = [MaaToolkitAdbDeviceHandle] Library.toolkit().MaaToolkitAdbDeviceGetInputMethods.restype = MaaAdbInputMethod - Library.toolkit().MaaToolkitAdbDeviceGetInputMethods.argtypes = [ - MaaToolkitAdbDeviceHandle - ] + Library.toolkit().MaaToolkitAdbDeviceGetInputMethods.argtypes = [MaaToolkitAdbDeviceHandle] Library.toolkit().MaaToolkitAdbDeviceGetConfig.restype = ctypes.c_char_p - Library.toolkit().MaaToolkitAdbDeviceGetConfig.argtypes = [ - MaaToolkitAdbDeviceHandle - ] + Library.toolkit().MaaToolkitAdbDeviceGetConfig.argtypes = [MaaToolkitAdbDeviceHandle] - Library.toolkit().MaaToolkitDesktopWindowListCreate.restype = ( - MaaToolkitDesktopWindowListHandle - ) + Library.toolkit().MaaToolkitDesktopWindowListCreate.restype = MaaToolkitDesktopWindowListHandle Library.toolkit().MaaToolkitDesktopWindowListCreate.argtypes = [] Library.toolkit().MaaToolkitDesktopWindowListDestroy.restype = None - Library.toolkit().MaaToolkitDesktopWindowListDestroy.argtypes = [ - MaaToolkitDesktopWindowListHandle - ] + Library.toolkit().MaaToolkitDesktopWindowListDestroy.argtypes = [MaaToolkitDesktopWindowListHandle] Library.toolkit().MaaToolkitDesktopWindowFindAll.restype = MaaBool - Library.toolkit().MaaToolkitDesktopWindowFindAll.argtypes = [ - MaaToolkitDesktopWindowListHandle - ] + Library.toolkit().MaaToolkitDesktopWindowFindAll.argtypes = [MaaToolkitDesktopWindowListHandle] Library.toolkit().MaaToolkitDesktopWindowListSize.restype = MaaSize - Library.toolkit().MaaToolkitDesktopWindowListSize.argtypes = [ - MaaToolkitDesktopWindowListHandle - ] + Library.toolkit().MaaToolkitDesktopWindowListSize.argtypes = [MaaToolkitDesktopWindowListHandle] - Library.toolkit().MaaToolkitDesktopWindowListAt.restype = ( - MaaToolkitDesktopWindowHandle - ) + Library.toolkit().MaaToolkitDesktopWindowListAt.restype = MaaToolkitDesktopWindowHandle Library.toolkit().MaaToolkitDesktopWindowListAt.argtypes = [ MaaToolkitDesktopWindowListHandle, MaaSize, ] Library.toolkit().MaaToolkitDesktopWindowGetHandle.restype = ctypes.c_void_p - Library.toolkit().MaaToolkitDesktopWindowGetHandle.argtypes = [ - MaaToolkitDesktopWindowHandle - ] + Library.toolkit().MaaToolkitDesktopWindowGetHandle.argtypes = [MaaToolkitDesktopWindowHandle] Library.toolkit().MaaToolkitDesktopWindowGetClassName.restype = ctypes.c_char_p - Library.toolkit().MaaToolkitDesktopWindowGetClassName.argtypes = [ - MaaToolkitDesktopWindowHandle - ] + Library.toolkit().MaaToolkitDesktopWindowGetClassName.argtypes = [MaaToolkitDesktopWindowHandle] Library.toolkit().MaaToolkitDesktopWindowGetWindowName.restype = ctypes.c_char_p - Library.toolkit().MaaToolkitDesktopWindowGetWindowName.argtypes = [ - MaaToolkitDesktopWindowHandle - ] + Library.toolkit().MaaToolkitDesktopWindowGetWindowName.argtypes = [MaaToolkitDesktopWindowHandle] Library.toolkit().MaaToolkitMacOSCheckPermission.restype = MaaBool - Library.toolkit().MaaToolkitMacOSCheckPermission.argtypes = [ - MaaMacOSPermission - ] + Library.toolkit().MaaToolkitMacOSCheckPermission.argtypes = [MaaMacOSPermission] Library.toolkit().MaaToolkitMacOSRequestPermission.restype = MaaBool - Library.toolkit().MaaToolkitMacOSRequestPermission.argtypes = [ - MaaMacOSPermission - ] + Library.toolkit().MaaToolkitMacOSRequestPermission.argtypes = [MaaMacOSPermission] Library.toolkit().MaaToolkitMacOSRevealPermissionSettings.restype = MaaBool - Library.toolkit().MaaToolkitMacOSRevealPermissionSettings.argtypes = [ - MaaMacOSPermission - ] + Library.toolkit().MaaToolkitMacOSRevealPermissionSettings.argtypes = [MaaMacOSPermission] diff --git a/source/binding/Python/pyproject.toml b/source/binding/Python/pyproject.toml index 8c59ca535c..922dd50dda 100644 --- a/source/binding/Python/pyproject.toml +++ b/source/binding/Python/pyproject.toml @@ -14,3 +14,34 @@ build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["maa"] + +[tool.ruff] +target-version = "py39" +# 120 (not maa-project's 100): the binding's docstrings are bilingual +# (Chinese/English) and several enums document methods as markdown tables, +# both of which legitimately exceed 100 columns. +line-length = 120 + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B"] + +# `maa/define.py` is an intentional aggregation hub consumed via +# `from .define import *` across the package. F403/F405 only describe that +# wildcard-import pattern (not correctness bugs); the public surface is +# documented via `__all__` in define.py. Converting to explicit imports is a +# possible follow-up if a lint-pure style is desired. +[tool.ruff.lint.per-file-ignores] +"maa/*.py" = ["F403", "F405"] +"maa/agent/*.py" = ["F403", "F405"] + +[tool.pyright] +include = ["maa"] +pythonVersion = "3.9" +typeCheckingMode = "strict" +reportMissingTypeStubs = "none" +# The buffer/job/handle objects expose `_handle` (and a few `_gen_*` helpers) +# that are private to external users (hence the underscore) but accessed across +# tightly-coupled modules within the package by design. pyright's +# reportPrivateUsage can't model package-private access, so it produces false +# positives for this intentional internal FFI coupling. +reportPrivateUsage = "none"