diff --git a/.dockerignore b/.dockerignore index d70a602..795a1d1 100644 --- a/.dockerignore +++ b/.dockerignore @@ -30,6 +30,7 @@ PixivUtil2/.pylintrc PixivUtil2/.python-version PixivUtil2/.travis.yml PixivUtil2/Dockerfile +PixivUtil2/PixivUtilGUI.py PixivUtil2/ISSUE_TEMPLATE.md PixivUtil2/MANIFEST.in PixivUtil2/changelog.txt @@ -37,6 +38,7 @@ PixivUtil2/freeimage-3.15.4-win32.dll PixivUtil2/icon2.ico PixivUtil2/pyproject.toml PixivUtil2/readme.md +PixivUtil2/README.md PixivUtil2/requirements.txt PixivUtil2/setup.py PixivUtil2/uv.lock diff --git a/PixivServer/app.py b/PixivServer/app.py index 3b18382..49f1550 100644 --- a/PixivServer/app.py +++ b/PixivServer/app.py @@ -1,24 +1,16 @@ import asyncio -import base64 -import contextlib -import json import logging -import os import time import traceback -import urllib.error -import urllib.request from contextlib import asynccontextmanager -from pathlib import Path -from urllib.parse import quote, urlparse -import psutil from fastapi import Depends, FastAPI, Request, Response import PixivServer import PixivServer.auth import PixivServer.routers import PixivServer.routers.database +import PixivServer.routers.dlq import PixivServer.routers.download_queue import PixivServer.routers.health import PixivServer.routers.metadata_queue @@ -28,132 +20,24 @@ # import PixivServer.routers.subscription import PixivServer.service import PixivServer.service.pixiv -from PixivServer.config.pixivutil import config as pixivutil_config -from PixivServer.config.rabbitmq import config as rabbitmq_config +from PixivServer.config.server import config as server_config from PixivServer.metrics import ( - DB_ARTWORKS, - DB_MEMBERS, - DB_PAGES, - DB_SERIES, - DB_TAGS, - DISK_DATABASE_BYTES, - DISK_DOWNLOADS_BYTES, HTTP_REQUEST_DURATION, HTTP_REQUEST_SIZE, HTTP_REQUESTS_TOTAL, HTTP_RESPONSE_SIZE, - QUEUE_DEPTH, SERVER_INFO, - SYS_CPU_PERCENT, - SYS_DISK_TOTAL_BYTES, - SYS_DISK_USED_BYTES, - SYS_MEM_TOTAL_BYTES, - SYS_MEM_USED_BYTES, ) -from PixivServer.repository.pixivutil import PixivUtilRepository +from PixivServer.service.metrics import periodic_metrics_collector from PixivServer.utils import get_version logger = logging.getLogger('uvicorn.pixivutil') -_SYSTEM_COLLECT_INTERVAL = 15 # seconds -_DB_STAT_COLLECT_INTERVAL = 60 # seconds -_DISK_COLLECT_INTERVAL = 300 # seconds (directory walk may be slow on large collections) -_QUEUE_COLLECT_INTERVAL = 15 # seconds - - -def _collect_system_metrics() -> None: - cpu = psutil.cpu_percent(interval=1) - vm = psutil.virtual_memory() - disk = psutil.disk_usage("/") - SYS_CPU_PERCENT.set(cpu) - SYS_MEM_USED_BYTES.set(vm.used) - SYS_MEM_TOTAL_BYTES.set(vm.total) - SYS_DISK_USED_BYTES.set(disk.used) - SYS_DISK_TOTAL_BYTES.set(disk.total) - - -def _collect_db_stats() -> None: - repo = PixivUtilRepository() - repo.open() - try: - DB_MEMBERS.set(repo.count_members()) - DB_ARTWORKS.set(repo.count_artworks()) - DB_PAGES.set(repo.count_pages()) - DB_TAGS.set(repo.count_tags()) - DB_SERIES.set(repo.count_series()) - finally: - repo.close() - - -def _collect_disk_metrics() -> None: - # Database file + WAL/SHM sidecars - db_path = pixivutil_config.db_path - db_bytes = 0 - for suffix in ("", "-wal", "-shm"): - p = db_path + suffix - if os.path.isfile(p): - db_bytes += os.path.getsize(p) - DISK_DATABASE_BYTES.set(db_bytes) - - # Downloads directory — recursive file size sum - downloads = Path(PixivServer.service.pixiv.service.downloads_folder) - total = 0 - if downloads.is_dir(): - for f in downloads.rglob("*"): - if f.is_file(): - with contextlib.suppress(OSError): - total += f.stat().st_size - DISK_DOWNLOADS_BYTES.set(total) - - -def _collect_queue_depth() -> None: - parsed = urlparse(rabbitmq_config.broker_url) - user = parsed.username or "guest" - password = parsed.password or "guest" - host = parsed.hostname or "rabbitmq" - raw_vhost = parsed.path.lstrip("/") - vhost = raw_vhost if raw_vhost else "/" - encoded_vhost = quote(vhost, safe="") - url = f"http://{host}:15672/api/queues/{encoded_vhost}/pixivutil-queue" - credentials = base64.b64encode(f"{user}:{password}".encode()).decode() - req = urllib.request.Request(url, headers={"Authorization": f"Basic {credentials}"}) - try: - with urllib.request.urlopen(req, timeout=5) as resp: - data = json.loads(resp.read()) - QUEUE_DEPTH.set(data.get("messages", 0)) - except (urllib.error.URLError, OSError, ValueError): - pass # Management API unavailable; leave metric stale - - -async def _periodic_metrics_collector() -> None: - last_system = 0.0 - last_db = 0.0 - last_disk = 0.0 - last_queue = 0.0 - while True: - now = time.monotonic() - try: - if now - last_system >= _SYSTEM_COLLECT_INTERVAL: - await asyncio.to_thread(_collect_system_metrics) - last_system = time.monotonic() - if now - last_db >= _DB_STAT_COLLECT_INTERVAL: - await asyncio.to_thread(_collect_db_stats) - last_db = time.monotonic() - if now - last_disk >= _DISK_COLLECT_INTERVAL: - await asyncio.to_thread(_collect_disk_metrics) - last_disk = time.monotonic() - if now - last_queue >= _QUEUE_COLLECT_INTERVAL: - await asyncio.to_thread(_collect_queue_depth) - last_queue = time.monotonic() - except Exception: # noqa: BLE001 - logger.warning(f"Metrics collector error: {traceback.format_exc()}") - await asyncio.sleep(1) - - @asynccontextmanager async def lifespan(_: FastAPI): try: - logger.info("Setting up server.") + logger.info(f"Setting up server in {server_config.server_env} environment.") + # startup actions await asyncio.sleep(5) PixivServer.service.pixiv.service.open(validate_pixiv_login=False) @@ -162,7 +46,7 @@ async def lifespan(_: FastAPI): except Exception as e: print(f"Encountered exception during application setup: {traceback.format_exc()}") raise e - collector_task = asyncio.create_task(_periodic_metrics_collector()) + collector_task = asyncio.create_task(periodic_metrics_collector()) yield # shutdown actions collector_task.cancel() @@ -234,11 +118,24 @@ async def request_metrics_middleware(request: Request, call_next): prefix="/api/database", dependencies=auth_dependency, ) +app.include_router( + PixivServer.routers.dlq.router, + prefix="/api/queue/dead-letter", + dependencies=auth_dependency, +) # app.include_router( # PixivServer.routers.subscription.router, # prefix="/api/subscription" # ) +if server_config.server_env == 'development': + import PixivServer.routers.dev + app.include_router( + PixivServer.routers.dev.router, + prefix="/api/dev", + dependencies=auth_dependency, + ) + @app.get("/") async def info(): diff --git a/PixivServer/config/celery.py b/PixivServer/config/celery.py index a950fa0..87a0296 100644 --- a/PixivServer/config/celery.py +++ b/PixivServer/config/celery.py @@ -2,13 +2,43 @@ from PixivServer.config import rabbitmq -default_exchange = Exchange('pixivutil-exchange', type='direct', durable=True, delivery_mode=2) +LEGACY_MAIN_EXCHANGE_NAME = "pixivutil-exchange" +LEGACY_MAIN_QUEUE_NAME = "pixivutil-queue" -CELERY_QUEUES = ( - Queue(name="pixivutil-queue", exchange=default_exchange, routing_key='pixivutil-queue', durable=True), +MAIN_EXCHANGE_NAME = "pixivutil-v1-exchange" +DLX_EXCHANGE_NAME = "pixivutil-v1-dlx" +MAIN_QUEUE_NAME = "pixivutil-v1-queue" +MAIN_ROUTING_KEY = MAIN_QUEUE_NAME +DEAD_LETTER_QUEUE_NAME = "pixivutil-v1-dead-letter" +QUEUE_MAX_PRIORITY = 3 + +default_exchange = Exchange(MAIN_EXCHANGE_NAME, type='direct', durable=True, delivery_mode=2) +dlx_exchange = Exchange(DLX_EXCHANGE_NAME, type='fanout', durable=True, delivery_mode=2) +main_queue = Queue( + name=MAIN_QUEUE_NAME, + exchange=default_exchange, + routing_key=MAIN_ROUTING_KEY, + durable=True, + queue_arguments={ + 'x-dead-letter-exchange': DLX_EXCHANGE_NAME, + 'x-max-priority': QUEUE_MAX_PRIORITY, + }, +) +dead_letter_queue = Queue( + name=DEAD_LETTER_QUEUE_NAME, + exchange=dlx_exchange, + routing_key='', + durable=True, ) +CELERY_QUEUES = (main_queue,) + BROKER_URL = rabbitmq.config.broker_url CELERY_ACKS_LATE = True CELERY_TASK_ACKS_LATE = True +CELERY_ACKS_ON_FAILURE_OR_TIMEOUT = False CELERY_TASK_REJECT_ON_WORKER_LOST = True + +# Keep broker-side queue priority behavior observable with a single worker. +# Without this, Celery can reserve multiple low-priority tasks before later high-priority tasks arrive. +CELERYD_PREFETCH_MULTIPLIER = 1 diff --git a/PixivServer/config/rabbitmq.py b/PixivServer/config/rabbitmq.py index c86d3e9..6525fda 100644 --- a/PixivServer/config/rabbitmq.py +++ b/PixivServer/config/rabbitmq.py @@ -5,5 +5,6 @@ class RabbitConfig: def __init__(self): self.broker_url = os.getenv("RABBITMQ_BROKER_URL", "amqp://guest:guest@rabbitmq:5672") + self.management_url = os.getenv("RABBITMQ_MANAGEMENT_URL", "http://guest:guest@rabbitmq:15672") config = RabbitConfig() diff --git a/PixivServer/config/server.py b/PixivServer/config/server.py index cbbf641..8ed690c 100644 --- a/PixivServer/config/server.py +++ b/PixivServer/config/server.py @@ -1,4 +1,5 @@ import os +from typing import Literal class ServerConfig: @@ -8,4 +9,9 @@ def __init__(self): api_key = os.getenv("PIXIVUTIL_SERVER_API_KEY") self.api_key = api_key if api_key else None + server_env = os.getenv("PIXIVUTIL_SERVER_ENV", "production") + if server_env != "production" and server_env != "development": + raise ValueError(f"Unrecognized environment: {server_env}") + self.server_env: Literal["production", "development"] = server_env + config = ServerConfig() diff --git a/PixivServer/metrics.py b/PixivServer/metrics.py index 2d2aa6c..6943b6e 100644 --- a/PixivServer/metrics.py +++ b/PixivServer/metrics.py @@ -23,6 +23,7 @@ # --- Worker queue metrics (periodic) --- QUEUE_DEPTH = Gauge("pixivutil_queue_depth", "Number of messages pending in the task queue") +DLQ_DEPTH = Gauge("pixivutil_dlq_depth", "Number of messages in the dead letter queue") # --- Request metrics (per-request via middleware) --- HTTP_REQUESTS_TOTAL = Counter( diff --git a/PixivServer/models/pixiv_worker.py b/PixivServer/models/pixiv_worker.py index 4165827..825d06c 100644 --- a/PixivServer/models/pixiv_worker.py +++ b/PixivServer/models/pixiv_worker.py @@ -2,6 +2,9 @@ Model layer for PixivUtil worker queue processing interface. """ +from typing import Any, Protocol, cast + +from celery.result import AsyncResult from pixivutil_server_common.models import ( TagMetadataFilterMode, TagSortOrder, @@ -10,6 +13,14 @@ from pydantic import BaseModel +class CeleryTask(Protocol): + def apply_async(self, *, args: list[Any] | None = None, priority: int | None = None, **kwargs: Any) -> AsyncResult: ... + + +def as_celery_task(task: Any) -> CeleryTask: + return cast(CeleryTask, task) + + class DownloadArtworkByIdRequest(BaseModel): artwork_id: int diff --git a/PixivServer/repository/pixivutil.py b/PixivServer/repository/pixivutil.py index 5f62715..144e06a 100644 --- a/PixivServer/repository/pixivutil.py +++ b/PixivServer/repository/pixivutil.py @@ -27,7 +27,7 @@ class PixivUtilRepository: def __init__(self): self.db_path = pixivutil_config.db_path - self.connection: sqlite3.Connection = None + self.connection: sqlite3.Connection = None # pyright: ignore[reportAttributeAccessIssue] this will be handled during open. def open(self): self.connection = sqlite3.connect(self.db_path, timeout=30.0) @@ -50,6 +50,7 @@ def get_member_data_by_id(self, member_id: int) -> PixivMemberPortfolio: Raises: KeyError: If member with the given ID is not found. """ + cursor = None try: cursor = self.connection.cursor() @@ -155,6 +156,7 @@ def get_all_pixiv_member_ids(self) -> list[int]: Returns: List of member IDs. Empty list if no members found. """ + cursor = None try: cursor = self.connection.cursor() cursor.execute("SELECT member_id FROM pixiv_master_member ORDER BY member_id ASC") @@ -174,6 +176,7 @@ def get_all_pixiv_image_ids(self) -> list[int]: Returns: List of image IDs. Empty list if no images found. """ + cursor = None try: cursor = self.connection.cursor() cursor.execute("SELECT image_id FROM pixiv_master_image ORDER BY image_id ASC") @@ -193,6 +196,7 @@ def get_all_pixiv_tags(self) -> list[str]: Returns: List of tag IDs. Empty list if no tags found. """ + cursor = None try: cursor = self.connection.cursor() cursor.execute("SELECT tag_id FROM pixiv_master_tag ORDER BY tag_id ASC") @@ -212,6 +216,7 @@ def get_all_pixiv_series(self) -> list[str]: Returns: List of series IDs. Empty list if no series found. """ + cursor = None try: cursor = self.connection.cursor() cursor.execute("SELECT series_id FROM pixiv_master_series ORDER BY series_id ASC") @@ -231,6 +236,7 @@ def get_tag_info_by_id(self, tag_id: str) -> PixivTagInfo: Raises: KeyError: If tag with the given ID is not found. """ + cursor = None try: cursor = self.connection.cursor() @@ -305,6 +311,7 @@ def get_series_info_by_id(self, series_id: str) -> PixivSeriesInfo: Raises: KeyError: If series with the given ID is not found. """ + cursor = None try: cursor = self.connection.cursor() @@ -365,6 +372,7 @@ def get_image_data_by_id(self, image_id: int) -> PixivImageComplete: Raises: KeyError: If image with the given ID is not found. """ + cursor = None try: cursor = self.connection.cursor() diff --git a/PixivServer/repository/subscription.py b/PixivServer/repository/subscription.py index ecc452a..7b977d4 100644 --- a/PixivServer/repository/subscription.py +++ b/PixivServer/repository/subscription.py @@ -9,7 +9,7 @@ class SubscriptionRepository: def __init__(self): self.db_path = pixivutil_config.db_path - self.connection: sqlite3.Connection = None + self.connection: sqlite3.Connection = None # pyright: ignore[reportAttributeAccessIssue] this will be handled during open. def open(self): self.connection = sqlite3.connect(self.db_path) @@ -35,57 +35,63 @@ def create_table(self): def check_member_id_exist(self, member_id: int) -> bool: result = False + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''SELECT 1 FROM pixiv_server_member_subscription WHERE member_id = ?''', (member_id, ) ) - result = c.fetchone() + result = cursor.fetchone() except Exception as e: logger.error(f'Failed to check existence of member ID: {member_id}') raise e finally: - c.close() + if cursor is not None: + cursor.close() return result - def select_member_name_by_id(self, member_id: int) -> str: - result: str = None + def select_member_name_by_id(self, member_id: int) -> str | None: + result: str | None = None + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''SELECT name FROM pixiv_server_member_subscription WHERE member_id = ?''', (member_id, ) ) - result = c.fetchone() + result = cursor.fetchone() except Exception as e: logger.error(f'Failed to retrieve member name for ID {member_id}.') raise e finally: - c.close() + if cursor is not None: + cursor.close() return result def select_member_subscriptions(self) -> list[tuple[int, str]]: - results: list = None - + results: list[tuple[int, str]] = [] + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''SELECT member_id, name FROM pixiv_server_member_subscription ORDER BY member_id''', ) - results = c.fetchall() + results = cursor.fetchall() except Exception as e: logger.error('Failed to export member subscriptions: ', e) raise e finally: - c.close() + if cursor is not None: + cursor.close() return results def add_member_subscription(self, member_id: int, member_name: str) -> bool: + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''INSERT OR IGNORE INTO pixiv_server_member_subscription VALUES(?, ?, datetime('now'), datetime('now'))''', (member_id, member_name, ) ) @@ -94,13 +100,15 @@ def add_member_subscription(self, member_id: int, member_name: str) -> bool: logger.error(f'Failed to add member subscription: {member_id}', e) raise e finally: - c.close() + if cursor is not None: + cursor.close() return True def remove_member_subscription(self, member_id: int) -> bool: + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''DELETE FROM pixiv_server_member_subscription WHERE member_id = ?''', (member_id, ) ) @@ -109,46 +117,51 @@ def remove_member_subscription(self, member_id: int) -> bool: logger.error(f"Failed to remove member {member_id} from subscription: ", e) raise e finally: - c.close() + if cursor is not None: + cursor.close() return True def check_tag_name_exist(self, tag_id: str) -> str: result = False + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''SELECT 1 FROM pixiv_server_tag_subscription WHERE tag_id = ?''', (tag_id, ) ) - result = c.fetchone() + result = cursor.fetchone() except Exception as e: logger.error(f"Failed to check existence of tag name: {tag_id}") raise e finally: - c.close() + if cursor is not None: + cursor.close() return result def select_tag_subscriptions(self) -> list[tuple[str]]: - results: list = None - + results: list[tuple[str]] = [] + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''SELECT tag_id FROM pixiv_server_tag_subscription''', ) - results = c.fetchall() + results = cursor.fetchall() except Exception as e: logger.error("Failed to export tag encoded subscriptions: ", e) raise e finally: - c.close() + if cursor is not None: + cursor.close() return results def add_tag_subscription(self, tag_id: str, bookmark_count: int) -> bool: + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''INSERT INTO pixiv_server_tag_subscription (tag_id, bookmark_count, created_date, last_modified_date) VALUES(?, ?, datetime('now'), datetime('now')) ON CONFLICT(tag_id) @@ -164,13 +177,15 @@ def add_tag_subscription(self, tag_id: str, bookmark_count: int) -> bool: logger.error(f'Failed to add encoded tag subscription: {tag_id}', e) raise e finally: - c.close() + if cursor is not None: + cursor.close() return True def remove_tag_subscription(self, tag_id: int) -> bool: + cursor = None try: - c = self.connection.cursor() - c.execute( + cursor = self.connection.cursor() + cursor.execute( '''DELETE FROM pixiv_server_tag_subscription WHERE tag_id = ?''', (tag_id, ) ) @@ -179,7 +194,8 @@ def remove_tag_subscription(self, tag_id: int) -> bool: logger.error(f"Failed to remove tag {tag_id} from subscription: ", e) raise e finally: - c.close() + if cursor is not None: + cursor.close() return True def close(self): diff --git a/PixivServer/routers/dev.py b/PixivServer/routers/dev.py new file mode 100644 index 0000000..00e21ab --- /dev/null +++ b/PixivServer/routers/dev.py @@ -0,0 +1,80 @@ +import logging + +from celery.result import AsyncResult +from fastapi import APIRouter, HTTPException, Query, Response +from fastapi.responses import JSONResponse + +from PixivServer.config.celery import QUEUE_MAX_PRIORITY +from PixivServer.models.pixiv_worker import DownloadArtworkByIdRequest +from PixivServer.worker.dev import ( + dev_download_artworks_by_id, + dev_priority_probe_task, + get_dev_task_state, + get_priority_probe_state, +) + +logger = logging.getLogger('uvicorn.pixivutil') +router = APIRouter() + + +@router.post("/artwork/{artwork_id}") +async def dev_queue_download_artwork_by_id(artwork_id: str) -> Response: + """ + (test) Download Pixiv image by ID. + """ + logger.info(f"Downloading Pixiv artwork by image ID: {artwork_id}.") + request = DownloadArtworkByIdRequest(artwork_id=int(artwork_id)) + artwork_title, member_name = "artwork title", "member title" + task: AsyncResult = dev_download_artworks_by_id.delay(request.model_dump()) + return JSONResponse({ + "task_id": task.id, + 'artwork_id': artwork_id, + "artwork_title": artwork_title, + "member_name": member_name, + }) + + +@router.get("/task/{task_id}") +async def dev_task_status(task_id: str) -> Response: + """ + (test) Return dev worker task attempt history and terminal status. + """ + state = get_dev_task_state(task_id) + if state is None: + raise HTTPException(status_code=404, detail=f"Dev task state not found: {task_id}") + return JSONResponse(state) + + +@router.post("/priority/{label}") +async def dev_queue_priority_probe_task( + label: str, + priority: int = Query(default=2, ge=1, le=QUEUE_MAX_PRIORITY + 1), # +1: allow one example of an over-limit value for clamping tests + sleep_ms: int = Query(default=1000, ge=0, le=10000), +) -> Response: + """ + (test) Enqueue a no-op task with explicit broker priority and predictable runtime. + """ + task: AsyncResult = dev_priority_probe_task.apply_async( + kwargs={ + "request_dict": { + "label": label, + "priority": priority, + "sleep_ms": sleep_ms, + }, + }, + priority=priority, + ) + return JSONResponse({ + "task_id": task.id, + "label": label, + "priority": priority, + "sleep_ms": sleep_ms, + }) + + +@router.get("/priority") +async def dev_priority_probe_status() -> Response: + """ + (test) Return execution ordering state for dev priority probe tasks. + """ + return JSONResponse(get_priority_probe_state()) diff --git a/PixivServer/routers/dlq.py b/PixivServer/routers/dlq.py new file mode 100644 index 0000000..0a9c8b4 --- /dev/null +++ b/PixivServer/routers/dlq.py @@ -0,0 +1,281 @@ +import asyncio +import logging +from typing import Any + +from fastapi import APIRouter, HTTPException, Response +from fastapi.responses import JSONResponse +from kombu import Connection + +from PixivServer.config import rabbitmq +from PixivServer.config.celery import ( + MAIN_ROUTING_KEY, + dead_letter_queue, + default_exchange, +) +from PixivServer.worker import pixiv_worker + +logger = logging.getLogger('uvicorn.pixivutil') +router = APIRouter() + + +def _extract_task_payload_from_celery_body(body: Any) -> dict: + # Celery protocol v2 JSON body is typically [args, kwargs, embed]. + if not isinstance(body, list) or len(body) < 2: + return {} + args = body[0] + kwargs = body[1] + if isinstance(args, list) and len(args) == 1 and isinstance(args[0], dict): + return args[0] + if isinstance(kwargs, dict) and "request_dict" in kwargs and isinstance(kwargs["request_dict"], dict): + return kwargs["request_dict"] + if isinstance(kwargs, dict): + return kwargs + return {} + + +def _normalize_dead_letter_payload(body: Any, headers: dict | None = None) -> dict | None: + if isinstance(body, dict): + if "dead_letter_id" in body and "task_name" in body: + return { + "dead_letter_id": str(body["dead_letter_id"]), + "task_name": str(body["task_name"]), + "payload": body.get("payload", {}) if isinstance(body.get("payload", {}), dict) else {}, + } + if "task_name" in body and "payload" in body: + # Backfill a stable identifier for older custom format if present. + return { + "dead_letter_id": str(body.get("dead_letter_id") or body.get("task_id") or ""), + "task_name": str(body["task_name"]), + "payload": body.get("payload", {}) if isinstance(body.get("payload", {}), dict) else {}, + } + + headers = headers or {} + task_name = headers.get("task") + task_id = headers.get("id") or headers.get("task_id") + if isinstance(task_name, str): + return { + "dead_letter_id": str(task_id or ""), + "task_name": task_name, + "payload": _extract_task_payload_from_celery_body(body), + } + return None + + +def _drain(conn) -> list: + bound = dead_letter_queue.bind(conn) + bound.declare() + msgs = [] + while True: + msg = bound.get(no_ack=False) + if msg is None: + break + msgs.append(msg) + return msgs + + +def _kombu_message_to_dlq_record(msg) -> dict | None: + headers = msg.headers if isinstance(msg.headers, dict) else {} + return _normalize_dead_letter_payload(msg.payload, headers=headers) + + +def _get_registered_task(task_name: str | None): + if not task_name: + return None + # Skip Celery internals/builtins even if registered. + if task_name.startswith("celery."): + return None + task = pixiv_worker.tasks.get(task_name) + return task if task is not None else None + + +def _is_native_celery_message(msg) -> bool: + headers = msg.headers if isinstance(msg.headers, dict) else {} + return isinstance(headers.get("task"), str) + + +def _clean_republish_headers(headers: dict) -> dict: + # Drop broker-added dead-letter metadata and Celery execution state so replay acts like a fresh enqueue. + ignored = { + "x-death", + "x-first-death-exchange", + "x-first-death-queue", + "x-first-death-reason", + "x-last-death-exchange", + "x-last-death-queue", + "x-last-death-reason", + "retries", + "eta", + "expires", + } + return {k: v for k, v in headers.items() if k not in ignored} + + +def _republish_native_celery_message(conn, msg) -> str | None: + if not _is_native_celery_message(msg): + return None + + headers = msg.headers if isinstance(msg.headers, dict) else {} + task_name = headers.get("task") + if not isinstance(task_name, str): + return None + + props = msg.properties if isinstance(msg.properties, dict) else {} + publish_props = {} + for key in ( + "correlation_id", + "reply_to", + "priority", + "message_id", + "timestamp", + "type", + "app_id", + ): + value = props.get(key) + if value is not None: + publish_props[key] = value + + with conn.Producer() as producer: + raw_body = msg.body + if isinstance(raw_body, str): + raw_body = raw_body.encode(msg.content_encoding or "utf-8") + if isinstance(raw_body, memoryview): + raw_body = raw_body.tobytes() + producer.publish( + raw_body, + exchange=default_exchange, + routing_key=MAIN_ROUTING_KEY, + headers=_clean_republish_headers(headers), + content_type=msg.content_type, + content_encoding=msg.content_encoding, + delivery_mode=2, + **publish_props, + ) + return task_name + + +def _resume_message(conn, msg, body: dict) -> str | None: + task_name = _republish_native_celery_message(conn, msg) + if task_name is not None: + return task_name + + task_fn = _get_registered_task(body.get("task_name")) + if task_fn is None: + return None + task_fn.delay(body.get("payload", {})) + return body.get("task_name") + + +@router.get("/") +async def list_dead_letter_messages() -> Response: + """ + List all messages currently in the dead letter queue. + """ + def _run() -> list[dict]: + messages: list[dict] = [] + with Connection(rabbitmq.config.broker_url) as conn: + for msg in _drain(conn): + try: + normalized = _kombu_message_to_dlq_record(msg) + if normalized is not None: + messages.append(normalized) + finally: + msg.reject(requeue=True) + return messages + + return JSONResponse(await asyncio.to_thread(_run)) + + +@router.post("/resume") +async def resume_all_dead_letter_messages() -> Response: + """ + Requeue all dead letter messages to the main queue. + Messages with unrecognised task names are left in the dead letter queue. + """ + def _run() -> int: + count = 0 + with Connection(rabbitmq.config.broker_url) as conn: + for msg in _drain(conn): + body = _kombu_message_to_dlq_record(msg) + if body is None: + logger.warning("Unparseable DLQ message, leaving in queue") + msg.reject(requeue=True) + continue + resumed_task_name = _resume_message(conn, msg, body) + if resumed_task_name is not None: + msg.ack() + count += 1 + else: + logger.warning(f"Unknown task name in DLQ message, leaving in queue: {body.get('task_name')}") + msg.reject(requeue=True) + return count + + count = await asyncio.to_thread(_run) + return JSONResponse({"requeued": count}) + + +@router.post("/{dead_letter_id}/resume") +async def resume_dead_letter_message(dead_letter_id: str) -> Response: + """ + Requeue a specific dead letter message to the main queue by its dead_letter_id. + """ + def _run() -> str | None: + """Returns task_name on success, None if not found, 'unknown' if task unrecognised.""" + with Connection(rabbitmq.config.broker_url) as conn: + for msg in _drain(conn): + body = _kombu_message_to_dlq_record(msg) + if body is None: + msg.reject(requeue=True) + continue + if body.get("dead_letter_id") == dead_letter_id: + resumed_task_name = _resume_message(conn, msg, body) + if resumed_task_name is None: + msg.reject(requeue=True) + return "unknown" + msg.ack() + return resumed_task_name + msg.reject(requeue=True) + return None + + result = await asyncio.to_thread(_run) + if result is None: + raise HTTPException(status_code=404, detail=f"Dead letter message not found: {dead_letter_id}") + if result == "unknown": + raise HTTPException(status_code=422, detail=f"Task name not recognised for dead letter message: {dead_letter_id}") + return JSONResponse({"dead_letter_id": dead_letter_id, "requeued": True, "task_name": result}) + + +@router.delete("/") +async def drop_all_dead_letter_messages() -> Response: + """ + Purge all messages from the dead letter queue. + """ + def _run() -> int: + with Connection(rabbitmq.config.broker_url) as conn: + return dead_letter_queue.bind(conn).purge() + + count = await asyncio.to_thread(_run) + return JSONResponse({"dropped": count}) + + +@router.delete("/{dead_letter_id}") +async def drop_dead_letter_message(dead_letter_id: str) -> Response: + """ + Drop a specific dead letter message by its dead_letter_id. + """ + def _run() -> bool: + with Connection(rabbitmq.config.broker_url) as conn: + for msg in _drain(conn): + body = _kombu_message_to_dlq_record(msg) + if body is None: + msg.reject(requeue=True) + continue + if body.get("dead_letter_id") == dead_letter_id: + msg.ack() + return True + msg.reject(requeue=True) + return False + + found = await asyncio.to_thread(_run) + if not found: + raise HTTPException(status_code=404, detail=f"Dead letter message not found: {dead_letter_id}") + return JSONResponse({"dead_letter_id": dead_letter_id, "dropped": True}) diff --git a/PixivServer/routers/download_queue.py b/PixivServer/routers/download_queue.py index e5fa051..e9f20be 100644 --- a/PixivServer/routers/download_queue.py +++ b/PixivServer/routers/download_queue.py @@ -5,10 +5,11 @@ from datetime import timedelta from celery.result import AsyncResult -from fastapi import APIRouter, Response +from fastapi import APIRouter, Query, Response from fastapi.responses import JSONResponse from pixivutil_server_common.models import TagSortOrder, TagTypeMode +from PixivServer.config.celery import QUEUE_MAX_PRIORITY from PixivServer.models.pixiv_worker import ( DeleteArtworkByIdRequest, DownloadArtworkByIdRequest, @@ -17,16 +18,17 @@ ) from PixivServer.repository.pixivutil import PixivUtilRepository from PixivServer.utils import is_valid_date -from PixivServer.worker import ( - delete_artwork_by_id, - download_artworks_by_id, - download_artworks_by_member_id, - download_artworks_by_tag, +from PixivServer.worker.download import ( + delete_artwork_by_id_task, + download_artworks_by_id_task, + download_artworks_by_member_id_task, + download_artworks_by_tag_task, ) logger = logging.getLogger('uvicorn.pixivutil') router = APIRouter() + def get_artwork_and_member_name_from_db(artwork_id: int) -> tuple[str | None, str | None]: repository = PixivUtilRepository() try: @@ -58,14 +60,17 @@ def get_member_name_from_db(member_id: int) -> str | None: @router.post("/artwork/{artwork_id}") -async def queue_download_artwork_by_id(artwork_id: str) -> Response: +async def queue_download_artwork_by_id( + artwork_id: str, + priority: int = Query(default=QUEUE_MAX_PRIORITY, ge=1, le=QUEUE_MAX_PRIORITY), +) -> Response: """ Download Pixiv image by ID. """ logger.info(f"Downloading Pixiv artwork by image ID: {artwork_id}.") request = DownloadArtworkByIdRequest(artwork_id=int(artwork_id)) artwork_title, member_name = get_artwork_and_member_name_from_db(request.artwork_id) - task: AsyncResult = download_artworks_by_id.delay(request.model_dump()) + task: AsyncResult = download_artworks_by_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({ "task_id": task.id, 'artwork_id': artwork_id, @@ -74,14 +79,17 @@ async def queue_download_artwork_by_id(artwork_id: str) -> Response: }) @router.post("/member/{member_id}") -async def queue_download_artworks_by_member_id(member_id: str) -> Response: +async def queue_download_artworks_by_member_id( + member_id: str, + priority: int = Query(default=2, ge=1, le=QUEUE_MAX_PRIORITY), +) -> Response: """ Download Pixiv image by member ID. """ logger.info(f"Downloading Pixiv artworks by member ID: {member_id}.") request = DownloadArtworksByMemberIdRequest(member_id=int(member_id)) member_name = get_member_name_from_db(request.member_id) - task: AsyncResult = download_artworks_by_member_id.delay(request.model_dump()) + task: AsyncResult = download_artworks_by_member_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({ "task_id": task.id, 'member_id': member_id, @@ -98,6 +106,7 @@ async def queue_download_artworks_by_tag( start_date: str | None = None, end_date: str | None = None, lookback_days: int | None = None, + priority: int = Query(default=1, ge=1, le=QUEUE_MAX_PRIORITY), ) -> Response: """ Download Pixiv images that have a given tag. @@ -134,14 +143,18 @@ async def queue_download_artworks_by_tag( start_date=start_date, end_date=end_date, ) - task: AsyncResult = download_artworks_by_tag.delay(request.model_dump()) + task: AsyncResult = download_artworks_by_tag_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({ 'task_id': task.id, 'tag': decoded_tag, }) @router.delete("/artwork/{artwork_id}") -async def queue_delete_artwork_by_id(artwork_id: str, delete_metadata: bool = True) -> Response: +async def queue_delete_artwork_by_id( + artwork_id: str, + delete_metadata: bool = True, + priority: int = Query(default=2, ge=1, le=QUEUE_MAX_PRIORITY), +) -> Response: """ Delete Pixiv image by ID from database and filesystem. @@ -154,7 +167,7 @@ async def queue_delete_artwork_by_id(artwork_id: str, delete_metadata: bool = Tr """ logger.info(f"Deleting Pixiv artwork by image ID: {artwork_id} (delete_metadata={delete_metadata}).") request = DeleteArtworkByIdRequest(artwork_id=int(artwork_id), delete_metadata=delete_metadata) - task: AsyncResult = delete_artwork_by_id.delay(request.model_dump()) + task: AsyncResult = delete_artwork_by_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({ "task_id": task.id, 'artwork_id': artwork_id, diff --git a/PixivServer/routers/metadata_queue.py b/PixivServer/routers/metadata_queue.py index 017b267..8472f2b 100644 --- a/PixivServer/routers/metadata_queue.py +++ b/PixivServer/routers/metadata_queue.py @@ -2,21 +2,22 @@ import urllib.parse from celery.result import AsyncResult -from fastapi import APIRouter +from fastapi import APIRouter, Query from fastapi.responses import JSONResponse from pixivutil_server_common.models import TagMetadataFilterMode +from PixivServer.config.celery import QUEUE_MAX_PRIORITY from PixivServer.models.pixiv_worker import ( DownloadArtworkMetadataByIdRequest, DownloadMemberMetadataByIdRequest, DownloadSeriesMetadataByIdRequest, DownloadTagMetadataByIdRequest, ) -from PixivServer.worker import ( - download_artwork_metadata_by_id, - download_member_metadata_by_id, - download_series_metadata_by_id, - download_tag_metadata_by_id, +from PixivServer.worker.metadata import ( + download_artwork_metadata_by_id_task, + download_member_metadata_by_id_task, + download_series_metadata_by_id_task, + download_tag_metadata_by_id_task, ) logger = logging.getLogger("uvicorn.pixivutil") @@ -24,7 +25,10 @@ @router.post("/member/{member_id}") -async def queue_download_member_metadata_by_id(member_id: str) -> JSONResponse: +async def queue_download_member_metadata_by_id( + member_id: str, + priority: int = Query(default=2, ge=1, le=QUEUE_MAX_PRIORITY), +) -> JSONResponse: """ Queue download of member metadata by ID. """ @@ -36,12 +40,15 @@ async def queue_download_member_metadata_by_id(member_id: str) -> JSONResponse: member_id_int = int(member_id) logger.info(f"Queueing member metadata download by ID: {member_id_int}.") request = DownloadMemberMetadataByIdRequest(member_id=member_id_int) - task: AsyncResult = download_member_metadata_by_id.delay(request.model_dump()) + task: AsyncResult = download_member_metadata_by_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({"task_id": task.id, "member_id": member_id_int}) @router.post("/artwork/{artwork_id}") -async def queue_download_artwork_metadata_by_id(artwork_id: str) -> JSONResponse: +async def queue_download_artwork_metadata_by_id( + artwork_id: str, + priority: int = Query(default=2, ge=1, le=QUEUE_MAX_PRIORITY), +) -> JSONResponse: """ Queue download of artwork metadata by ID. """ @@ -53,12 +60,15 @@ async def queue_download_artwork_metadata_by_id(artwork_id: str) -> JSONResponse artwork_id_int = int(artwork_id) logger.info(f"Queueing artwork metadata download by ID: {artwork_id_int}.") request = DownloadArtworkMetadataByIdRequest(artwork_id=artwork_id_int) - task: AsyncResult = download_artwork_metadata_by_id.delay(request.model_dump()) + task: AsyncResult = download_artwork_metadata_by_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({"task_id": task.id, "artwork_id": artwork_id_int}) @router.post("/series/{series_id}") -async def queue_download_series_metadata_by_id(series_id: str) -> JSONResponse: +async def queue_download_series_metadata_by_id( + series_id: str, + priority: int = Query(default=1, ge=1, le=QUEUE_MAX_PRIORITY), +) -> JSONResponse: """ Queue download of series metadata by ID. """ @@ -70,7 +80,7 @@ async def queue_download_series_metadata_by_id(series_id: str) -> JSONResponse: series_id_int = int(series_id) logger.info(f"Queueing series metadata download by ID: {series_id_int}.") request = DownloadSeriesMetadataByIdRequest(series_id=series_id_int) - task: AsyncResult = download_series_metadata_by_id.delay(request.model_dump()) + task: AsyncResult = download_series_metadata_by_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse({"task_id": task.id, "series_id": series_id_int}) @@ -78,6 +88,7 @@ async def queue_download_series_metadata_by_id(series_id: str) -> JSONResponse: async def queue_download_tag_metadata_by_id( tag: str, filter_mode: TagMetadataFilterMode = "none", + priority: int = Query(default=1, ge=1, le=QUEUE_MAX_PRIORITY), ) -> JSONResponse: """ Queue download of tag metadata by tag ID/name. @@ -89,7 +100,7 @@ async def queue_download_tag_metadata_by_id( request = DownloadTagMetadataByIdRequest( tag=decoded_tag, filter_mode=filter_mode ) - task: AsyncResult = download_tag_metadata_by_id.delay(request.model_dump()) + task: AsyncResult = download_tag_metadata_by_id_task.apply_async(args=[request.model_dump()], priority=priority) return JSONResponse( {"task_id": task.id, "tag": decoded_tag, "filter_mode": request.filter_mode} ) diff --git a/PixivServer/service/metrics.py b/PixivServer/service/metrics.py new file mode 100644 index 0000000..6a001c3 --- /dev/null +++ b/PixivServer/service/metrics.py @@ -0,0 +1,153 @@ +import asyncio +import base64 +import contextlib +import json +import logging +import os +import time +import traceback +import urllib.error +import urllib.request +from pathlib import Path +from urllib.parse import quote, urlparse + +import psutil + +import PixivServer + +# import PixivServer.routers.subscription +import PixivServer.service +import PixivServer.service.pixiv +from PixivServer.config.celery import DEAD_LETTER_QUEUE_NAME, MAIN_QUEUE_NAME +from PixivServer.config.pixivutil import config as pixivutil_config +from PixivServer.config.rabbitmq import config as rabbitmq_config +from PixivServer.metrics import ( + DB_ARTWORKS, + DB_MEMBERS, + DB_PAGES, + DB_SERIES, + DB_TAGS, + DISK_DATABASE_BYTES, + DISK_DOWNLOADS_BYTES, + DLQ_DEPTH, + QUEUE_DEPTH, + SYS_CPU_PERCENT, + SYS_DISK_TOTAL_BYTES, + SYS_DISK_USED_BYTES, + SYS_MEM_TOTAL_BYTES, + SYS_MEM_USED_BYTES, +) +from PixivServer.repository.pixivutil import PixivUtilRepository + +logger = logging.getLogger('uvicorn.pixivutil') + +_SYSTEM_COLLECT_INTERVAL = 15 # seconds +_DB_STAT_COLLECT_INTERVAL = 60 # seconds +_DISK_COLLECT_INTERVAL = 300 # seconds (directory walk may be slow on large collections) +_QUEUE_COLLECT_INTERVAL = 15 # seconds + + +def _collect_system_metrics() -> None: + cpu = psutil.cpu_percent(interval=1) + vm = psutil.virtual_memory() + disk = psutil.disk_usage("/") + SYS_CPU_PERCENT.set(cpu) + SYS_MEM_USED_BYTES.set(vm.used) + SYS_MEM_TOTAL_BYTES.set(vm.total) + SYS_DISK_USED_BYTES.set(disk.used) + SYS_DISK_TOTAL_BYTES.set(disk.total) + + +def _collect_db_stats() -> None: + repo = PixivUtilRepository() + repo.open() + try: + DB_MEMBERS.set(repo.count_members()) + DB_ARTWORKS.set(repo.count_artworks()) + DB_PAGES.set(repo.count_pages()) + DB_TAGS.set(repo.count_tags()) + DB_SERIES.set(repo.count_series()) + finally: + repo.close() + + +def _collect_disk_metrics() -> None: + # Database file + WAL/SHM sidecars + db_path = pixivutil_config.db_path + db_bytes = 0 + for suffix in ("", "-wal", "-shm"): + p = db_path + suffix + if os.path.isfile(p): + db_bytes += os.path.getsize(p) + DISK_DATABASE_BYTES.set(db_bytes) + + # Downloads directory — recursive file size sum + downloads = Path(PixivServer.service.pixiv.service.downloads_folder) + total = 0 + if downloads.is_dir(): + for f in downloads.rglob("*"): + if f.is_file(): + with contextlib.suppress(OSError): + total += f.stat().st_size + DISK_DOWNLOADS_BYTES.set(total) + + +def _rabbitmq_queue_message_count(queue_name: str) -> int | None: + parsed_mgmt = urlparse(rabbitmq_config.management_url) + user = parsed_mgmt.username or "guest" + password = parsed_mgmt.password or "guest" + base = f"{parsed_mgmt.scheme}://{parsed_mgmt.hostname}" + if parsed_mgmt.port: + base = f"{base}:{parsed_mgmt.port}" + parsed_broker = urlparse(rabbitmq_config.broker_url) + raw_vhost = parsed_broker.path.lstrip("/") + vhost = raw_vhost if raw_vhost else "/" + encoded_vhost = quote(vhost, safe="") + url = f"{base}/api/queues/{encoded_vhost}/{queue_name}" + credentials = base64.b64encode(f"{user}:{password}".encode()).decode() + req = urllib.request.Request(url, headers={"Authorization": f"Basic {credentials}"}) + try: + with urllib.request.urlopen(req, timeout=5) as resp: + data = json.loads(resp.read()) + return int(data.get("messages", 0)) + except (urllib.error.URLError, OSError, ValueError) as exc: + logger.warning("RabbitMQ management API unreachable for queue %s: %s", queue_name, exc) + return None + + +def _collect_queue_depth() -> None: + count = _rabbitmq_queue_message_count(MAIN_QUEUE_NAME) + if count is not None: + QUEUE_DEPTH.set(count) + + +def _collect_dlq_depth() -> None: + count = _rabbitmq_queue_message_count(DEAD_LETTER_QUEUE_NAME) + if count is not None: + DLQ_DEPTH.set(count) + + +async def periodic_metrics_collector() -> None: + last_system = 0.0 + last_db = 0.0 + last_disk = 0.0 + last_queue = 0.0 + while True: + now = time.monotonic() + try: + if now - last_system >= _SYSTEM_COLLECT_INTERVAL: + await asyncio.to_thread(_collect_system_metrics) + last_system = time.monotonic() + if now - last_db >= _DB_STAT_COLLECT_INTERVAL: + await asyncio.to_thread(_collect_db_stats) + last_db = time.monotonic() + if now - last_disk >= _DISK_COLLECT_INTERVAL: + await asyncio.to_thread(_collect_disk_metrics) + last_disk = time.monotonic() + if now - last_queue >= _QUEUE_COLLECT_INTERVAL: + await asyncio.to_thread(_collect_queue_depth) + await asyncio.to_thread(_collect_dlq_depth) + last_queue = time.monotonic() + except Exception: # noqa: BLE001 + logger.warning(f"Metrics collector error: {traceback.format_exc()}") + await asyncio.sleep(1) diff --git a/PixivServer/service/pixiv.py b/PixivServer/service/pixiv.py index d8aba8e..5ec0820 100644 --- a/PixivServer/service/pixiv.py +++ b/PixivServer/service/pixiv.py @@ -3,6 +3,7 @@ import sqlite3 import sys import traceback +from typing import Protocol, cast from urllib.error import HTTPError sys.path.append('PixivUtil2') @@ -23,6 +24,7 @@ PixivArtistHandler, PixivBrowserFactory, PixivConfig, + PixivConstant, PixivDBManager, PixivException, PixivHelper, @@ -33,12 +35,25 @@ logger = logging.getLogger(__name__) + +class PixivConfigProtocol(Protocol): + """Structural protocol for the PixivConfig attributes used by PixivUtilService.""" + cookie: str + retry: int + retryWait: int + dbPath: str + rootDirectory: str + + def loadConfig(self, path: str | None = None) -> None: ... + def writeConfig(self, error: bool = False, path: str | None = None) -> None: ... + + # ------ START CALLER ITEMS ------ -__config__ = PixivConfig.PixivConfig() +__config__: PixivConfigProtocol = cast(PixivConfigProtocol, PixivConfig.PixivConfig()) configfile = ".pixivUtil2/conf/config.ini" -__dbManager__ = None -__br__: PixivBrowserFactory.PixivBrowser = None +__dbManager__: PixivDBManager | None = None +__br__: PixivBrowserFactory.PixivBrowser | None = None __blacklistTags = [] __suppressTags = [] __log__ = None @@ -127,6 +142,7 @@ def close(self): PixivHelper.print_and_log("info", "Closing...") # self.remove_database() __config__.writeConfig(path=configfile) + assert __dbManager__ is not None __dbManager__.close() def open_database(self): @@ -146,6 +162,7 @@ def configure_database_connection(self, connection: sqlite3.Connection) -> None: cursor.close() def remove_database(self): + assert __dbManager__ is not None __dbManager__.close() os.remove(__config__.dbPath) @@ -159,6 +176,7 @@ def reset_downloads(self): def login_pixiv(self, cookie) -> bool: result = False try: + assert __br__ is not None result = __br__.loginUsingCookie(login_cookie=cookie) except (HTTPError, PixivException, AssertionError, ValueError) as e: logger.error(f'Error at doLogin(): {sys.exc_info()}') @@ -192,6 +210,52 @@ def get_artwork_name(self, artwork_id: int) -> str: raise PixivException("Cannot get artwork name; response: " + str(response)) return data.imageTitle + def _raise_metadata_process_image_failure( + self, + *, + artwork_id: int, + process_result: int, + previous_error_list_len: int, + previous_error_code: int, + ) -> None: + if process_result == PixivConstant.PIXIVUTIL_OK: + return + + error_list = globals()["__errorList"] + new_errors = error_list[previous_error_list_len:] if len(error_list) >= previous_error_list_len else error_list + pixiv_error: PixivException | None = None + for item in reversed(new_errors): + if not isinstance(item, dict): + continue + exc = item.get("exception") + if isinstance(exc, PixivException): + pixiv_error = exc + break + + if pixiv_error is not None: + if pixiv_error.errorCode in (PixivException.DOWNLOAD_FAILED_NETWORK, PixivException.SERVER_ERROR): + raise ConnectionError( + f"Artwork metadata fetch failed due to network/server error for artwork_id={artwork_id}" + ) from pixiv_error + raise RuntimeError( + f"Artwork metadata fetch failed for artwork_id={artwork_id} " + f"(pixiv_error_code={pixiv_error.errorCode}, result={process_result})" + ) from pixiv_error + + current_error_code = ERROR_CODE + error_code = current_error_code if current_error_code != previous_error_code else -1 + if error_code in (PixivException.DOWNLOAD_FAILED_NETWORK, PixivException.SERVER_ERROR): + raise ConnectionError( + f"Artwork metadata fetch failed due to network/server error for artwork_id={artwork_id} " + f"(pixiv_error_code={error_code}, result={process_result})" + ) + # Metadata fetch failures can be swallowed by PixivUtil2 without a stable error code. + # Treat unclassified failures as transient so worker retry/DLQ policy can recover them. + raise ConnectionError( + f"Artwork metadata fetch failed with unclassified error for artwork_id={artwork_id} " + f"(pixiv_error_code={error_code}, result={process_result})" + ) + def download_artwork_by_id(self, request: DownloadArtworkByIdRequest): PixivHelper.print_and_log("info", f"Download by artwork ID: {request.artwork_id}") return PixivImageHandler.process_image( @@ -328,7 +392,9 @@ def download_member_metadata_by_id(self, request: DownloadMemberMetadataByIdRequ def download_artwork_metadata_by_id(self, request: DownloadArtworkMetadataByIdRequest): PixivHelper.print_and_log("info", f"Download artwork metadata by ID: {request.artwork_id}") - PixivImageHandler.process_image( + previous_error_list_len = len(globals()["__errorList"]) + previous_error_code = ERROR_CODE + result = PixivImageHandler.process_image( sys.modules[__name__], __config__, artist=None, @@ -336,6 +402,12 @@ def download_artwork_metadata_by_id(self, request: DownloadArtworkMetadataByIdRe useblacklist=False, metadata_only=True, ) + self._raise_metadata_process_image_failure( + artwork_id=request.artwork_id, + process_result=result, + previous_error_list_len=previous_error_list_len, + previous_error_code=previous_error_code, + ) def download_series_metadata_by_id(self, request: DownloadSeriesMetadataByIdRequest): PixivHelper.print_and_log("info", f"Download series metadata by ID: {request.series_id}") diff --git a/PixivServer/worker.py b/PixivServer/worker.py deleted file mode 100644 index 432bcec..0000000 --- a/PixivServer/worker.py +++ /dev/null @@ -1,201 +0,0 @@ -import logging -import random -import time -import traceback - -from celery import Celery -from celery.signals import setup_logging, worker_init, worker_shutdown - -import PixivServer - -# from PixivServer.config.worker import config as worker_config -import PixivServer.service -import PixivServer.service.pixiv -from PixivServer.models.pixiv_worker import ( - DeleteArtworkByIdRequest, - DownloadArtworkByIdRequest, - DownloadArtworkMetadataByIdRequest, - DownloadArtworksByMemberIdRequest, - DownloadArtworksByTagsRequest, - DownloadMemberMetadataByIdRequest, - DownloadSeriesMetadataByIdRequest, - DownloadTagMetadataByIdRequest, -) - -logger = logging.getLogger(__name__) - -pixiv_worker = Celery(__name__) -pixiv_worker.config_from_object('PixivServer.config.celery') - -def __job_sleep(): - """ - Sleep a random interval between 1-5s for all jobs. - Synchronous/blocking sleep. - """ - time_to_sleep = random.uniform(1, 5) - time.sleep(time_to_sleep) - return 0 - -@worker_init.connect -def on_worker_init(*args, **kwargs): - PixivServer.service.pixiv.service.open() - return - -@worker_shutdown.connect -def on_worker_shutdown(*args, **kwargs): - PixivServer.service.pixiv.service.close() - return - -@setup_logging.connect -def config_loggers(*args, **kwargs): - return - -# @celery.on_after_configure.connect -# def setup_periodic_tasks(sender, **kwargs): -# sender.add_periodic_task(worker_config.subscription_time_seconds, run_artist_subscription_job.s(), name='Artist subscription job') -# sender.add_periodic_task(worker_config.subscription_time_seconds, run_tag_subscription_job.s(), name='Tag subscription job') - -# @celery.task(name='run_artist_subscription_job') -# def run_artist_subscription_job(): -# logger.info('Running scheduled member subscription job...') -# new_artworks_by_member_names = subscription_service.run_member_subscription_job() -# member_names = list(new_artworks_by_member_names.keys()) -# if member_names: -# message = '[Scheduled job]: Downloaded new artworks from: ' + ', '.join(member_names) -# return True - -# @celery.task(name='run_tag_subscription_job') -# def run_tag_subscription_job(): -# ''' -# Since this is calling process_tags directly cannot extract logs. -# ''' -# logger.info('Running scheduled tag subscription job...') -# subscription_service.run_tag_subscription_job() - -@pixiv_worker.task(name="download_artworks_by_id", queue='pixivutil-queue') -def download_artworks_by_id(request_dict: dict): - try: - request = DownloadArtworkByIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Downloading artwork by ID: {request.artwork_id}.") - PixivServer.service.pixiv.service.download_artwork_by_id(request) - return True - except Exception as e: - logger.error(f"Error in download_artworks_by_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - -@pixiv_worker.task(name="download_artworks_by_member_id", queue='pixivutil-queue') -def download_artworks_by_member_id(request_dict: dict): - try: - request = DownloadArtworksByMemberIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Downloading artworks by member ID: {request.member_id}.") - PixivServer.service.pixiv.service.download_artworks_by_member_id(request) - return True - except Exception as e: - logger.error(f"Error in download_artworks_by_member_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - -@pixiv_worker.task(name="download_artworks_by_tag", queue='pixivutil-queue') -def download_artworks_by_tag(request_dict: dict): - try: - request = DownloadArtworksByTagsRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Downloading artwork by tag: {request.tags}. Bookmark minimum: {request.bookmark_count}") - PixivServer.service.pixiv.service.download_artworks_by_tag(request) - return True - except Exception as e: - logger.error(f"Error in download_artworks_by_tag worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - -@pixiv_worker.task(name="delete_artwork_by_id", queue='pixivutil-queue') -def delete_artwork_by_id(request_dict: dict): - try: - request = DeleteArtworkByIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Deleting artwork by ID: {request.artwork_id}.") - PixivServer.service.pixiv.service.delete_artwork_by_id(request) - return True - except Exception as e: - logger.error(f"Error in delete_artwork_by_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - - -@pixiv_worker.task(name="download_member_metadata_by_id", queue='pixivutil-queue') -def download_member_metadata_by_id(request_dict: dict): - try: - request = DownloadMemberMetadataByIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log( - "info", - f"Downloading member metadata by ID: {request.member_id}.", - ) - PixivServer.service.pixiv.service.download_member_metadata_by_id(request) - return True - except Exception as e: - logger.error(f"Error in download_member_metadata_by_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - - -@pixiv_worker.task(name="download_artwork_metadata_by_id", queue='pixivutil-queue') -def download_artwork_metadata_by_id(request_dict: dict): - try: - request = DownloadArtworkMetadataByIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log( - "info", - f"Downloading artwork metadata by ID: {request.artwork_id}.", - ) - PixivServer.service.pixiv.service.download_artwork_metadata_by_id(request) - return True - except Exception as e: - logger.error(f"Error in download_artwork_metadata_by_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - - -@pixiv_worker.task(name="download_series_metadata_by_id", queue='pixivutil-queue') -def download_series_metadata_by_id(request_dict: dict): - try: - request = DownloadSeriesMetadataByIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log( - "info", - f"Downloading series metadata by ID: {request.series_id}.", - ) - PixivServer.service.pixiv.service.download_series_metadata_by_id(request) - return True - except Exception as e: - logger.error(f"Error in download_series_metadata_by_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() - - -@pixiv_worker.task(name="download_tag_metadata_by_id", queue='pixivutil-queue') -def download_tag_metadata_by_id(request_dict: dict): - try: - request = DownloadTagMetadataByIdRequest(**request_dict) - PixivServer.service.pixiv.PixivHelper.print_and_log( - "info", - f"Downloading tag metadata: {request.tag} (filter_mode={request.filter_mode}).", - ) - PixivServer.service.pixiv.service.download_tag_metadata_by_id(request) - return True - except Exception as e: - logger.error(f"Error in download_tag_metadata_by_id worker: {str(e)}") - logger.error(traceback.format_exc()) - raise - finally: - __job_sleep() diff --git a/PixivServer/worker/__init__.py b/PixivServer/worker/__init__.py new file mode 100644 index 0000000..7f149c8 --- /dev/null +++ b/PixivServer/worker/__init__.py @@ -0,0 +1,99 @@ +import logging + +from celery import Celery +from celery.signals import setup_logging, worker_init, worker_shutdown +from kombu import Exchange, Queue + +import PixivServer +import PixivServer.service +import PixivServer.service.pixiv +from PixivServer.config.celery import ( + LEGACY_MAIN_EXCHANGE_NAME, + LEGACY_MAIN_QUEUE_NAME, + dead_letter_queue, + main_queue, +) +from PixivServer.config.server import config as server_config + +logger = logging.getLogger(__name__) + +pixiv_worker = Celery(__name__) +pixiv_worker.config_from_object('PixivServer.config.celery') + + +@worker_init.connect +def on_worker_init(sender, **kwargs): + with sender.app.connection() as conn: + _cleanup_legacy_queue(conn, LEGACY_MAIN_QUEUE_NAME) + _cleanup_legacy_exchange(conn, LEGACY_MAIN_EXCHANGE_NAME) + main_queue.bind(conn).declare() + dead_letter_queue.bind(conn).declare() + PixivServer.service.pixiv.service.open() + + +@worker_shutdown.connect +def on_worker_shutdown(*args, **kwargs): + PixivServer.service.pixiv.service.close() + return + + +@setup_logging.connect +def config_loggers(*args, **kwargs): + return + + +def _cleanup_legacy_queue(conn, queue_name: str) -> None: + queue = Queue(name=queue_name, durable=True).bind(conn) + try: + queue.purge() + logger.warning(f"Purged legacy queue during v1 broker cutover: {queue_name}") + except Exception as exc: # noqa: BLE001 + if "NOT_FOUND" not in str(exc): + logger.warning(f"Failed to purge legacy queue {queue_name}: {exc}") + try: + queue.delete(if_unused=False, if_empty=False) + logger.warning(f"Deleted legacy queue during v1 broker cutover: {queue_name}") + except Exception as exc: # noqa: BLE001 + if "NOT_FOUND" not in str(exc): + logger.warning(f"Failed to delete legacy queue {queue_name}: {exc}") + + +def _cleanup_legacy_exchange(conn, exchange_name: str) -> None: + exchange = Exchange(exchange_name, type='direct', durable=True).bind(conn) + try: + exchange.delete() + logger.warning(f"Deleted legacy exchange during v1 broker cutover: {exchange_name}") + except Exception as exc: # noqa: BLE001 + if "NOT_FOUND" not in str(exc): + logger.warning(f"Failed to delete legacy exchange {exchange_name}: {exc}") + + +# @celery.on_after_configure.connect +# def setup_periodic_tasks(sender, **kwargs): +# sender.add_periodic_task(worker_config.subscription_time_seconds, run_artist_subscription_job.s(), name='Artist subscription job') +# sender.add_periodic_task(worker_config.subscription_time_seconds, run_tag_subscription_job.s(), name='Tag subscription job') + +# @celery.task(name='run_artist_subscription_job') +# def run_artist_subscription_job(): +# logger.info('Running scheduled member subscription job...') +# new_artworks_by_member_names = subscription_service.run_member_subscription_job() +# member_names = list(new_artworks_by_member_names.keys()) +# if member_names: +# message = '[Scheduled job]: Downloaded new artworks from: ' + ', '.join(member_names) +# return True + +# @celery.task(name='run_tag_subscription_job') +# def run_tag_subscription_job(): +# ''' +# Since this is calling process_tags directly cannot extract logs. +# ''' +# logger.info('Running scheduled tag subscription job...') +# subscription_service.run_tag_subscription_job() + +# Register task modules, as @shared_task decorator only runs when the module is imported. +# until then, the task functions don't exist in Celery's registry. +import PixivServer.worker.download # noqa: E402, F401 +import PixivServer.worker.metadata # noqa: E402, F401 + +if server_config.server_env == 'development': + import PixivServer.worker.dev # noqa: F401 diff --git a/PixivServer/worker/common.py b/PixivServer/worker/common.py new file mode 100644 index 0000000..00fb8ab --- /dev/null +++ b/PixivServer/worker/common.py @@ -0,0 +1,19 @@ +import random +import time +from urllib.error import URLError + +from PixivServer.service.pixiv import PixivException + +NETWORK_MAX_RETRIES = 3 +NETWORK_RETRY_COUNTDOWN = 60 + + +def job_sleep(): + time.sleep(random.uniform(1, 5)) + return 0 + + +def is_network_exception(exc: BaseException) -> bool: + if isinstance(exc, PixivException): + return exc.errorCode in (PixivException.DOWNLOAD_FAILED_NETWORK, PixivException.SERVER_ERROR) + return isinstance(exc, (ConnectionError, TimeoutError, URLError)) diff --git a/PixivServer/worker/dev.py b/PixivServer/worker/dev.py new file mode 100644 index 0000000..7cd7664 --- /dev/null +++ b/PixivServer/worker/dev.py @@ -0,0 +1,199 @@ +import json +import logging +import time +from pathlib import Path +from typing import Any + +from celery import shared_task + +from PixivServer.config.celery import MAIN_QUEUE_NAME +from PixivServer.models.pixiv_worker import DownloadArtworkByIdRequest + +logger = logging.getLogger(__name__) + +_SIMULATED_RETRY_COUNTDOWN = 1 +_SIMULATED_MAX_RETRIES = 1 +_SUCCESS_SENTINEL_PATH = Path("/tmp/pixivutil-dev-dlq-success.flag") +_TASK_STATE_PATH = Path("/workdir/.pixivUtil2/dev-dlq-task-state.json") +_PRIORITY_STATE_PATH = Path("/workdir/.pixivUtil2/dev-priority-task-state.json") + + +def _load_task_states() -> dict[str, dict[str, Any]]: + try: + data = json.loads(_TASK_STATE_PATH.read_text()) + except FileNotFoundError: + return {} + except json.JSONDecodeError: + logger.warning("(dev) Could not parse task state file; resetting") + return {} + if not isinstance(data, dict): + return {} + result: dict[str, dict[str, Any]] = {} + for key, value in data.items(): + if isinstance(key, str) and isinstance(value, dict): + result[key] = value + return result + + +def _save_task_states(states: dict[str, dict[str, Any]]) -> None: + _TASK_STATE_PATH.parent.mkdir(parents=True, exist_ok=True) + temp_path = _TASK_STATE_PATH.with_suffix(".tmp") + temp_path.write_text(json.dumps(states, sort_keys=True)) + temp_path.replace(_TASK_STATE_PATH) + + +def _record_task_attempt(task_id: str, artwork_id: int, attempt: int) -> None: + states = _load_task_states() + state = states.get(task_id) + if not isinstance(state, dict): + state = { + "task_id": task_id, + "artwork_id": artwork_id, + "attempt_history": [], + "terminal_state": None, + } + history = state.get("attempt_history") + if not isinstance(history, list): + history = [] + history.append(attempt) + state["attempt_history"] = history + state["task_id"] = task_id + state["artwork_id"] = artwork_id + state["last_attempt"] = attempt + state["terminal_state"] = None + states[task_id] = state + _save_task_states(states) + + +def _record_terminal_state(task_id: str, terminal_state: str) -> None: + states = _load_task_states() + state = states.get(task_id) + if not isinstance(state, dict): + state = {"task_id": task_id, "attempt_history": []} + state["task_id"] = task_id + state["terminal_state"] = terminal_state + states[task_id] = state + _save_task_states(states) + + +def get_dev_task_state(task_id: str) -> dict[str, Any] | None: + state = _load_task_states().get(task_id) + return state if isinstance(state, dict) else None + + +def _load_priority_probe_state() -> dict[str, Any]: + try: + data = json.loads(_PRIORITY_STATE_PATH.read_text()) + except FileNotFoundError: + return {"started": [], "completed": [], "tasks": {}} + except json.JSONDecodeError: + logger.warning("(dev) Could not parse priority probe state file; resetting") + return {"started": [], "completed": [], "tasks": {}} + + if not isinstance(data, dict): + return {"started": [], "completed": [], "tasks": {}} + + started = data.get("started") + completed = data.get("completed") + tasks = data.get("tasks") + return { + "started": started if isinstance(started, list) else [], + "completed": completed if isinstance(completed, list) else [], + "tasks": tasks if isinstance(tasks, dict) else {}, + } + + +def _save_priority_probe_state(state: dict[str, Any]) -> None: + _PRIORITY_STATE_PATH.parent.mkdir(parents=True, exist_ok=True) + temp_path = _PRIORITY_STATE_PATH.with_suffix(".tmp") + temp_path.write_text(json.dumps(state, sort_keys=True)) + temp_path.replace(_PRIORITY_STATE_PATH) + + +def _record_priority_probe_started(task_id: str, label: str, priority: int) -> None: + state = _load_priority_probe_state() + started = state.get("started") + if not isinstance(started, list): + started = [] + started.append(label) + state["started"] = started + + tasks = state.get("tasks") + if not isinstance(tasks, dict): + tasks = {} + tasks[task_id] = { + "task_id": task_id, + "label": label, + "priority": priority, + "started_index": len(started) - 1, + "status": "started", + } + state["tasks"] = tasks + _save_priority_probe_state(state) + + +def _record_priority_probe_completed(task_id: str) -> None: + state = _load_priority_probe_state() + completed = state.get("completed") + if not isinstance(completed, list): + completed = [] + + tasks = state.get("tasks") + if not isinstance(tasks, dict): + tasks = {} + task_state = tasks.get(task_id) + if isinstance(task_state, dict): + label = task_state.get("label") + if isinstance(label, str): + completed.append(label) + task_state["status"] = "completed" + task_state["completed_index"] = len(completed) - 1 + tasks[task_id] = task_state + + state["completed"] = completed + state["tasks"] = tasks + _save_priority_probe_state(state) + + +def get_priority_probe_state() -> dict[str, Any]: + return _load_priority_probe_state() + +# This endpoint exists to confirm DLQ functionality. +# Use this for any kind of DLQ-related task and make any necessary changes in logic to prove/confirm hypotheses. +# This will be commented out once DLQ is stable, so in the meantime do whatever you want with this endpoint, +# just clean it up when done and don't commit things back in. +@shared_task(bind=True, name="dev_download_artworks_by_id", queue=MAIN_QUEUE_NAME) +def dev_download_artworks_by_id(self, request_dict: dict): + request = DownloadArtworkByIdRequest(**request_dict) + task_id = str(self.request.id) + attempt = self.request.retries + 1 + max_attempts = _SIMULATED_MAX_RETRIES + 1 + _record_task_attempt(task_id, request.artwork_id, attempt) + logger.error(f"(dev) Attempt {attempt}/{max_attempts} for artwork_id={request.artwork_id}") + if attempt < max_attempts: + raise self.retry( + exc=ConnectionError("Simulated network failure"), + countdown=_SIMULATED_RETRY_COUNTDOWN, + ) + if _SUCCESS_SENTINEL_PATH.exists(): + _record_terminal_state(task_id, "succeeded") + logger.error(f"(dev) Sentinel found at {_SUCCESS_SENTINEL_PATH}; succeeding on resumed run") + return True + _record_terminal_state(task_id, "failed") + logger.error(f"(dev) Max retries exceeded for artwork_id={request.artwork_id}, raising terminal failure for broker DLQ") + raise ConnectionError("Simulated terminal failure after retries") + + +@shared_task(bind=True, name="dev_priority_probe_task", queue=MAIN_QUEUE_NAME) +def dev_priority_probe_task(self, request_dict: dict): + task_id = str(self.request.id) + label = str(request_dict.get("label", task_id)) + priority = int(request_dict.get("priority", 2)) + sleep_ms = int(request_dict.get("sleep_ms", 1000)) + + logger.error(f"(dev-priority) starting label={label} priority={priority} task_id={task_id}") + _record_priority_probe_started(task_id, label, priority) + time.sleep(max(sleep_ms, 0) / 1000) + _record_priority_probe_completed(task_id) + logger.error(f"(dev-priority) completed label={label} priority={priority} task_id={task_id}") + return True diff --git a/PixivServer/worker/download.py b/PixivServer/worker/download.py new file mode 100644 index 0000000..454061f --- /dev/null +++ b/PixivServer/worker/download.py @@ -0,0 +1,102 @@ +import logging +import traceback + +from celery import shared_task + +import PixivServer.service.pixiv +from PixivServer.config.celery import MAIN_QUEUE_NAME +from PixivServer.models.pixiv_worker import ( + DeleteArtworkByIdRequest, + DownloadArtworkByIdRequest, + DownloadArtworksByMemberIdRequest, + DownloadArtworksByTagsRequest, + as_celery_task, +) +from PixivServer.worker.common import ( + NETWORK_MAX_RETRIES, + NETWORK_RETRY_COUNTDOWN, + is_network_exception, + job_sleep, +) + +logger = logging.getLogger(__name__) + + +@shared_task(bind=True, name="download_artworks_by_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_artworks_by_id(self, request_dict: dict): + try: + request = DownloadArtworkByIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Downloading artwork by ID: {request.artwork_id}.") + PixivServer.service.pixiv.service.download_artwork_by_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_artworks_by_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: non-network errors return False (acked as SUCCESS) to avoid DLQ routing. + # Use per-task acks_on_failure_or_timeout=True + Reject(requeue=False) for network + # exhaustion so non-network errors can raise normally and show as FAILURE. + return False + finally: + job_sleep() + + +@shared_task(bind=True, name="download_artworks_by_member_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_artworks_by_member_id(self, request_dict: dict): + try: + request = DownloadArtworksByMemberIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Downloading artworks by member ID: {request.member_id}.") + PixivServer.service.pixiv.service.download_artworks_by_member_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_artworks_by_member_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: see download_artworks_by_id for non-network error handling fix. + return False + finally: + job_sleep() + + +@shared_task(bind=True, name="download_artworks_by_tag", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_artworks_by_tag(self, request_dict: dict): + try: + request = DownloadArtworksByTagsRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Downloading artwork by tag: {request.tags}. Bookmark minimum: {request.bookmark_count}") + PixivServer.service.pixiv.service.download_artworks_by_tag(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_artworks_by_tag worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: see download_artworks_by_id for non-network error handling fix. + return False + finally: + job_sleep() + + +@shared_task(bind=True, name="delete_artwork_by_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def delete_artwork_by_id(self, request_dict: dict): + try: + request = DeleteArtworkByIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log("info", f"Deleting artwork by ID: {request.artwork_id}.") + PixivServer.service.pixiv.service.delete_artwork_by_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in delete_artwork_by_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: see download_artworks_by_id for non-network error handling fix. + return False + finally: + job_sleep() + + +download_artworks_by_id_task = as_celery_task(download_artworks_by_id) +download_artworks_by_member_id_task = as_celery_task(download_artworks_by_member_id) +download_artworks_by_tag_task = as_celery_task(download_artworks_by_tag) +delete_artwork_by_id_task = as_celery_task(delete_artwork_by_id) diff --git a/PixivServer/worker/metadata.py b/PixivServer/worker/metadata.py new file mode 100644 index 0000000..88d7607 --- /dev/null +++ b/PixivServer/worker/metadata.py @@ -0,0 +1,114 @@ +import logging +import traceback + +from celery import shared_task + +import PixivServer.service.pixiv +from PixivServer.config.celery import MAIN_QUEUE_NAME +from PixivServer.models.pixiv_worker import ( + DownloadArtworkMetadataByIdRequest, + DownloadMemberMetadataByIdRequest, + DownloadSeriesMetadataByIdRequest, + DownloadTagMetadataByIdRequest, + as_celery_task, +) +from PixivServer.worker.common import ( + NETWORK_MAX_RETRIES, + NETWORK_RETRY_COUNTDOWN, + is_network_exception, + job_sleep, +) + +logger = logging.getLogger(__name__) + + +@shared_task(bind=True, name="download_member_metadata_by_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_member_metadata_by_id(self, request_dict: dict): + try: + request = DownloadMemberMetadataByIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log( + "info", + f"Downloading member metadata by ID: {request.member_id}.", + ) + PixivServer.service.pixiv.service.download_member_metadata_by_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_member_metadata_by_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: non-network errors return False (acked as SUCCESS) to avoid DLQ routing. + # Use per-task acks_on_failure_or_timeout=True + Reject(requeue=False) for network + # exhaustion so non-network errors can raise normally and show as FAILURE. + return False + finally: + job_sleep() + + +@shared_task(bind=True, name="download_artwork_metadata_by_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_artwork_metadata_by_id(self, request_dict: dict): + try: + request = DownloadArtworkMetadataByIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log( + "info", + f"Downloading artwork metadata by ID: {request.artwork_id}.", + ) + PixivServer.service.pixiv.service.download_artwork_metadata_by_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_artwork_metadata_by_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: see download_member_metadata_by_id for non-network error handling fix. + return False + finally: + job_sleep() + + +@shared_task(bind=True, name="download_series_metadata_by_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_series_metadata_by_id(self, request_dict: dict): + try: + request = DownloadSeriesMetadataByIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log( + "info", + f"Downloading series metadata by ID: {request.series_id}.", + ) + PixivServer.service.pixiv.service.download_series_metadata_by_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_series_metadata_by_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: see download_member_metadata_by_id for non-network error handling fix. + return False + finally: + job_sleep() + + +@shared_task(bind=True, name="download_tag_metadata_by_id", queue=MAIN_QUEUE_NAME, max_retries=NETWORK_MAX_RETRIES) +def download_tag_metadata_by_id(self, request_dict: dict): + try: + request = DownloadTagMetadataByIdRequest(**request_dict) + PixivServer.service.pixiv.PixivHelper.print_and_log( + "info", + f"Downloading tag metadata: {request.tag} (filter_mode={request.filter_mode}).", + ) + PixivServer.service.pixiv.service.download_tag_metadata_by_id(request) + return True + except Exception as e: # noqa: BLE001 + logger.error(f"Error in download_tag_metadata_by_id worker: {str(e)}") + logger.error(traceback.format_exc()) + if is_network_exception(e): + raise self.retry(exc=e, countdown=NETWORK_RETRY_COUNTDOWN) + # TODO: see download_member_metadata_by_id for non-network error handling fix. + return False + finally: + job_sleep() + + +download_member_metadata_by_id_task = as_celery_task(download_member_metadata_by_id) +download_artwork_metadata_by_id_task = as_celery_task(download_artwork_metadata_by_id) +download_series_metadata_by_id_task = as_celery_task(download_series_metadata_by_id) +download_tag_metadata_by_id_task = as_celery_task(download_tag_metadata_by_id) diff --git a/PixivServerCommon/pixivutil_server_common/models.py b/PixivServerCommon/pixivutil_server_common/models.py index a0b1f41..c887d54 100644 --- a/PixivServerCommon/pixivutil_server_common/models.py +++ b/PixivServerCommon/pixivutil_server_common/models.py @@ -5,6 +5,31 @@ from pydantic import BaseModel, Field +class DeadLetterMessage(BaseModel): + dead_letter_id: str + task_name: str + payload: dict + + +class DeadLetterResumeAllResponse(BaseModel): + requeued: int + + +class DeadLetterResumeResponse(BaseModel): + dead_letter_id: str + requeued: bool + task_name: str + + +class DeadLetterDropAllResponse(BaseModel): + dropped: int + + +class DeadLetterDropResponse(BaseModel): + dead_letter_id: str + dropped: bool + + class QueueTaskResponse(BaseModel): task_id: str artwork_id: str | int | None = None diff --git a/PixivUtil2 b/PixivUtil2 index 2943173..68b6d7e 160000 --- a/PixivUtil2 +++ b/PixivUtil2 @@ -1 +1 @@ -Subproject commit 2943173256420eb191bf7be013fb674d52cca89d +Subproject commit 68b6d7e37e9448f746de49edce89d6c1865ff472 diff --git a/PixivUtilClient/pixivutil_client/client.py b/PixivUtilClient/pixivutil_client/client.py index 233f1da..382d681 100644 --- a/PixivUtilClient/pixivutil_client/client.py +++ b/PixivUtilClient/pixivutil_client/client.py @@ -8,6 +8,11 @@ from pixivutil_client.exceptions import PixivAPIError, PixivTransportError from pixivutil_client.models import ( + DeadLetterDropAllResponse, + DeadLetterDropResponse, + DeadLetterMessage, + DeadLetterResumeAllResponse, + DeadLetterResumeResponse, PixivImageComplete, PixivMemberPortfolio, PixivSeriesInfo, @@ -123,12 +128,18 @@ async def health_pixiv(self) -> str: payload = await self._request("GET", "/api/health/pixiv") return str(payload) - async def queue_download_artwork(self, artwork_id: int) -> QueueTaskResponse: - payload = await self._request("POST", f"/api/queue/download/artwork/{artwork_id}") + async def queue_download_artwork(self, artwork_id: int, *, priority: int | None = None) -> QueueTaskResponse: + params: dict[str, Any] = {} + if priority is not None: + params["priority"] = priority + payload = await self._request("POST", f"/api/queue/download/artwork/{artwork_id}", params=params or None) return QueueTaskResponse.model_validate(payload) - async def queue_download_member(self, member_id: int) -> QueueTaskResponse: - payload = await self._request("POST", f"/api/queue/download/member/{member_id}") + async def queue_download_member(self, member_id: int, *, priority: int | None = None) -> QueueTaskResponse: + params: dict[str, Any] = {} + if priority is not None: + params["priority"] = priority + payload = await self._request("POST", f"/api/queue/download/member/{member_id}", params=params or None) return QueueTaskResponse.model_validate(payload) async def queue_download_tag( @@ -142,6 +153,7 @@ async def queue_download_tag( start_date: str | None = None, end_date: str | None = None, lookback_days: int | None = None, + priority: int | None = None, ) -> QueueTaskResponse: params: dict[str, Any] = { "sort_order": sort_order, @@ -156,41 +168,66 @@ async def queue_download_tag( params["end_date"] = end_date if lookback_days is not None: params["lookback_days"] = lookback_days + if priority is not None: + params["priority"] = priority encoded_tag = quote(tag, safe="") payload = await self._request("POST", f"/api/queue/download/tag/{encoded_tag}", params=params) return QueueTaskResponse.model_validate(payload) - async def queue_delete_artwork(self, artwork_id: int, delete_metadata: bool = True) -> QueueTaskResponse: + async def queue_delete_artwork( + self, + artwork_id: int, + delete_metadata: bool = True, + *, + priority: int | None = None, + ) -> QueueTaskResponse: + params: dict[str, Any] = {"delete_metadata": str(delete_metadata).lower()} + if priority is not None: + params["priority"] = priority payload = await self._request( "DELETE", f"/api/queue/download/artwork/{artwork_id}", - params={"delete_metadata": str(delete_metadata).lower()}, + params=params, ) return QueueTaskResponse.model_validate(payload) - async def queue_metadata_artwork(self, artwork_id: int) -> QueueTaskResponse: - payload = await self._request("POST", f"/api/queue/metadata/artwork/{artwork_id}") + async def queue_metadata_artwork(self, artwork_id: int, *, priority: int | None = None) -> QueueTaskResponse: + params: dict[str, Any] = {} + if priority is not None: + params["priority"] = priority + payload = await self._request("POST", f"/api/queue/metadata/artwork/{artwork_id}", params=params or None) return QueueTaskResponse.model_validate(payload) - async def queue_metadata_member(self, member_id: int) -> QueueTaskResponse: - payload = await self._request("POST", f"/api/queue/metadata/member/{member_id}") + async def queue_metadata_member(self, member_id: int, *, priority: int | None = None) -> QueueTaskResponse: + params: dict[str, Any] = {} + if priority is not None: + params["priority"] = priority + payload = await self._request("POST", f"/api/queue/metadata/member/{member_id}", params=params or None) return QueueTaskResponse.model_validate(payload) - async def queue_metadata_series(self, series_id: int) -> QueueTaskResponse: - payload = await self._request("POST", f"/api/queue/metadata/series/{series_id}") + async def queue_metadata_series(self, series_id: int, *, priority: int | None = None) -> QueueTaskResponse: + params: dict[str, Any] = {} + if priority is not None: + params["priority"] = priority + payload = await self._request("POST", f"/api/queue/metadata/series/{series_id}", params=params or None) return QueueTaskResponse.model_validate(payload) async def queue_metadata_tag( self, tag: str, filter_mode: TagMetadataFilterMode = "none", + *, + priority: int | None = None, ) -> QueueTaskResponse: + params: dict[str, Any] = {"filter_mode": filter_mode} + if priority is not None: + params["priority"] = priority encoded_tag = quote(tag, safe="") payload = await self._request( "POST", f"/api/queue/metadata/tag/{encoded_tag}", - params={"filter_mode": filter_mode}, + params=params, ) return QueueTaskResponse.model_validate(payload) @@ -243,3 +280,25 @@ async def reset_database(self) -> str: async def reset_downloads(self) -> str: payload = await self._request("DELETE", "/api/server/downloads") return str(payload) + + async def list_dead_letter_messages(self) -> list[DeadLetterMessage]: + payload = await self._request("GET", "/api/queue/dead-letter/") + return [DeadLetterMessage.model_validate(item) for item in payload] + + async def resume_all_dead_letter_messages(self) -> DeadLetterResumeAllResponse: + payload = await self._request("POST", "/api/queue/dead-letter/resume") + return DeadLetterResumeAllResponse.model_validate(payload) + + async def resume_dead_letter_message(self, dead_letter_id: str) -> DeadLetterResumeResponse: + encoded_dead_letter_id = quote(dead_letter_id, safe="") + payload = await self._request("POST", f"/api/queue/dead-letter/{encoded_dead_letter_id}/resume") + return DeadLetterResumeResponse.model_validate(payload) + + async def drop_all_dead_letter_messages(self) -> DeadLetterDropAllResponse: + payload = await self._request("DELETE", "/api/queue/dead-letter/") + return DeadLetterDropAllResponse.model_validate(payload) + + async def drop_dead_letter_message(self, dead_letter_id: str) -> DeadLetterDropResponse: + encoded_dead_letter_id = quote(dead_letter_id, safe="") + payload = await self._request("DELETE", f"/api/queue/dead-letter/{encoded_dead_letter_id}") + return DeadLetterDropResponse.model_validate(payload) diff --git a/PixivUtilClient/pixivutil_client/models.py b/PixivUtilClient/pixivutil_client/models.py index c710e1e..14571fc 100644 --- a/PixivUtilClient/pixivutil_client/models.py +++ b/PixivUtilClient/pixivutil_client/models.py @@ -3,6 +3,11 @@ """ from pixivutil_server_common.models import ( + DeadLetterDropAllResponse, + DeadLetterDropResponse, + DeadLetterMessage, + DeadLetterResumeAllResponse, + DeadLetterResumeResponse, PixivDateInfo, PixivImageComplete, PixivImageToSeries, @@ -23,6 +28,11 @@ ) __all__ = [ + "DeadLetterDropAllResponse", + "DeadLetterDropResponse", + "DeadLetterMessage", + "DeadLetterResumeAllResponse", + "DeadLetterResumeResponse", "PixivDateInfo", "PixivImageComplete", "PixivImageToSeries", diff --git a/PixivUtilClient/tests/test_client.py b/PixivUtilClient/tests/test_client.py index 85bcfe6..1187ec7 100644 --- a/PixivUtilClient/tests/test_client.py +++ b/PixivUtilClient/tests/test_client.py @@ -21,9 +21,45 @@ async def auth_echo(request: web.Request) -> web.Response: async def failure(_: web.Request) -> web.Response: return web.json_response({"error": "bad request"}, status=400) + async def dlq_list(_: web.Request) -> web.Response: + return web.json_response( + [ + { + "dead_letter_id": "abc-123", + "task_name": "download_artworks_by_id", + "payload": {"artwork_id": 42}, + } + ] + ) + + async def dlq_resume_all(_: web.Request) -> web.Response: + return web.json_response({"requeued": 2}) + + async def dlq_resume_one(request: web.Request) -> web.Response: + dead_letter_id = request.match_info["dead_letter_id"] + return web.json_response( + { + "dead_letter_id": dead_letter_id, + "requeued": True, + "task_name": "download_artworks_by_id", + } + ) + + async def dlq_drop_all(_: web.Request) -> web.Response: + return web.json_response({"dropped": 3}) + + async def dlq_drop_one(request: web.Request) -> web.Response: + dead_letter_id = request.match_info["dead_letter_id"] + return web.json_response({"dead_letter_id": dead_letter_id, "dropped": True}) + app.router.add_get("/api/database/members", plain_json) app.router.add_post("/api/queue/download/artwork/123", auth_echo) app.router.add_get("/boom", failure) + app.router.add_get("/api/queue/dead-letter/", dlq_list) + app.router.add_post("/api/queue/dead-letter/resume", dlq_resume_all) + app.router.add_post("/api/queue/dead-letter/{dead_letter_id}/resume", dlq_resume_one) + app.router.add_delete("/api/queue/dead-letter/", dlq_drop_all) + app.router.add_delete("/api/queue/dead-letter/{dead_letter_id}", dlq_drop_one) runner = web.AppRunner(app) await runner.setup() @@ -93,3 +129,26 @@ def request(self, method: str, url: str, **kwargs: Any) -> FakeContextManager: payload = await client._request("GET", "/probe") assert payload["ok"] is True assert captured["kwargs"]["ssl"] is False + + +@pytest.mark.asyncio +async def test_dead_letter_queue_client_methods(server_url: str) -> None: + async with PixivAsyncClient(server_url) as client: + messages = await client.list_dead_letter_messages() + assert len(messages) == 1 + assert messages[0].dead_letter_id == "abc-123" + assert messages[0].payload["artwork_id"] == 42 + + resumed_all = await client.resume_all_dead_letter_messages() + assert resumed_all.requeued == 2 + + resumed_one = await client.resume_dead_letter_message("abc-123") + assert resumed_one.dead_letter_id == "abc-123" + assert resumed_one.requeued is True + + dropped_all = await client.drop_all_dead_letter_messages() + assert dropped_all.dropped == 3 + + dropped_one = await client.drop_dead_letter_message("abc-123") + assert dropped_one.dead_letter_id == "abc-123" + assert dropped_one.dropped is True diff --git a/README.md b/README.md index 953dd54..249a154 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,11 @@ For example, the server supports the following endpoints: - Get tag metadata by ID - Get series metadata by ID +#### [Dead letter queue (DLQ)](/docs/api/dlq.md) + +API endpoints to inspect, replay, and purge failed worker messages in the dead +letter queue. + #### [Download queueing](/docs/api/download.md) API endpoints to queue content (artwork) downloads for worker from server. @@ -105,6 +110,8 @@ Authorization: Bearer ## Architecture and Development +When running PixivUtil server in development, set `PIXIVUTIL_SERVER_ENV=development`. This will enable debug logging level. + PixivUtil server is a Python project based on PixivUtil2 as its API client engine. PixivUtil2 is a separate git repository added to this as a submodule. PixivUtil server as a service consists of 3 microservices: the PixivUtil API server, PixivUtil worker, and RabbitMQ queue. The server receives API requests from the user/client, and passes them as long-running jobs to a single-process worker which handles them one at a time via the queue, controlling API volumes and avoiding rate limit violations. diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..96d8c54 --- /dev/null +++ b/conftest.py @@ -0,0 +1,26 @@ +import pytest + + +def pytest_addoption(parser: pytest.Parser): + parser.addoption( + "--pixiv-api", + action="store_true", + default=False, + help="Run tests that make Pixiv API calls.", + ) + + +def pytest_configure(config: pytest.Config): + config.addinivalue_line( + "markers", + "pixiv_api: test performs Pixiv API calls and is skipped unless --pixiv-api is provided.", + ) + + +def pytest_collection_modifyitems(config: pytest.Config, items: list[pytest.Item]): + if config.getoption("--pixiv-api"): + return + skip_pixiv_api = pytest.mark.skip(reason="need --pixiv-api option enabled") + for item in items: + if "pixiv_api" in item.keywords: + item.add_marker(skip_pixiv_api) diff --git a/docker-compose.yml b/docker-compose.yml index 0be6051..df14b52 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -27,12 +27,13 @@ services: build: . container_name: pixivutil-worker # entrypoint: ["uv", "run"] # use this if you want to keep a non-root user. - command: ["celery", "-A", "PixivServer.worker.pixiv_worker", "worker", "--concurrency=1", "--loglevel=info", "-B"] + command: ["celery", "-A", "PixivServer.worker.pixiv_worker", "worker", "--concurrency=1", "--loglevel=debug", "-B"] environment: - PUID=${PUID:-1000} - PGID=${PGID:-1000} - CELERYBEAT_SCHEDULE=/workdir/.pixivUtil2/celerybeat-schedule - PIXIVUTIL_COOKIE=$PIXIVUTIL_COOKIE + - PIXIVUTIL_SERVER_ENV=development - RABBITMQ_BROKER_URL=amqp://guest:guest@rabbitmq:5672 volumes: - pixivutil-data:/workdir/.pixivUtil2 @@ -45,12 +46,13 @@ services: build: . container_name: pixivutil-server # entrypoint: ["uv", "run"] # use this if you want to keep a non-root user. - command: ["uvicorn", "PixivServer.app:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "info"] + command: ["uvicorn", "PixivServer.app:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "debug"] environment: - PUID=${PUID:-1000} - PGID=${PGID:-1000} - PIXIVUTIL_COOKIE=$PIXIVUTIL_COOKIE - PIXIVUTIL_SERVER_API_KEY=$PIXIVUTIL_SERVER_API_KEY + - PIXIVUTIL_SERVER_ENV=development - RABBITMQ_BROKER_URL=amqp://guest:guest@rabbitmq:5672 volumes: - pixivutil-data:/workdir/.pixivUtil2 diff --git a/docs/api/dlq.md b/docs/api/dlq.md new file mode 100644 index 0000000..1602a4c --- /dev/null +++ b/docs/api/dlq.md @@ -0,0 +1,59 @@ +# Dead Letter Queue API + +Authentication: +- Requires `Authorization: Bearer ` when `PIXIVUTIL_SERVER_API_KEY` is set. +- If `PIXIVUTIL_SERVER_API_KEY` is unset/empty, authentication is disabled. + +Dead letter queue (DLQ) endpoints manage failed worker messages that were moved +to the broker dead letter queue after retry exhaustion or terminal failure. + +`GET /api/queue/dead-letter/` + +List all messages currently in the dead letter queue. + +Response item shape: +- `dead_letter_id`: message/task identifier (string) +- `task_name`: registered Celery task name (string) +- `payload`: original task payload (object) + +`POST /api/queue/dead-letter/resume` + +Requeue all resumable dead letter messages back to the main worker queue. + +Response: +- `requeued`: number of messages requeued + +Notes: +- Messages with unknown/unregistered task names are left in the DLQ. +- Unparseable messages are left in the DLQ. + +`POST /api/queue/dead-letter/{dead_letter_id}/resume` + +Requeue a specific dead letter message by `dead_letter_id`. + +Response: +- `dead_letter_id`: requested DLQ message id +- `requeued`: `true` when message was requeued +- `task_name`: task name that was requeued + +Errors: +- `404`: dead letter message not found +- `422`: task name is not recognized and cannot be resumed + +`DELETE /api/queue/dead-letter/` + +Purge all messages from the dead letter queue. + +Response: +- `dropped`: number of messages removed + +`DELETE /api/queue/dead-letter/{dead_letter_id}` + +Drop a specific dead letter message by `dead_letter_id`. + +Response: +- `dead_letter_id`: requested DLQ message id +- `dropped`: `true` when message was removed + +Errors: +- `404`: dead letter message not found diff --git a/pyproject.toml b/pyproject.toml index 061b099..e7eeb89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ license = { file = "LICENSE" } authors = [ { name = "psilabs-dev" } ] -version = "2.4.3" +version = "2.4.4" dependencies = [ "aiofiles>=25.1.0", "celery>=5.5.3", @@ -41,8 +41,14 @@ pixivutil2 = [ [tool.pyright] pythonVersion = "3.12" include = ["PixivServer", "PixivServerCommon", "PixivUtilClient", "tests"] -exclude = ["PixivUtil2"] +exclude = [ + "PixivUtil2", + "**/node_modules", + "**/__pycache__", + "**/.*", ".venv", +] reportUnusedCoroutine = "error" +extraPaths = ["PixivUtil2"] [tool.ruff] exclude = [ diff --git a/tests/test_celery_config.py b/tests/test_celery_config.py new file mode 100644 index 0000000..7fd2b4b --- /dev/null +++ b/tests/test_celery_config.py @@ -0,0 +1,34 @@ +from celery import Celery + +from PixivServer.config.celery import ( + DLX_EXCHANGE_NAME, + MAIN_QUEUE_NAME, + QUEUE_MAX_PRIORITY, +) + + +def test_celery_failure_is_rejected_for_rabbitmq_dlq(): + app = Celery("pixivutil-test") + app.config_from_object("PixivServer.config.celery") + + assert app.conf.task_acks_late is True + assert app.conf.task_acks_on_failure_or_timeout is False + + +def test_main_queue_declares_dead_letter_exchange(): + app = Celery("pixivutil-test") + app.config_from_object("PixivServer.config.celery") + + queues = {queue.name: queue for queue in app.conf.CELERY_QUEUES} + assert MAIN_QUEUE_NAME in queues + assert queues[MAIN_QUEUE_NAME].queue_arguments == { + "x-dead-letter-exchange": DLX_EXCHANGE_NAME, + "x-max-priority": QUEUE_MAX_PRIORITY, + } + + +def test_worker_prefetch_multiplier_is_one_for_priority_fairness(): + app = Celery("pixivutil-test") + app.config_from_object("PixivServer.config.celery") + + assert app.conf.worker_prefetch_multiplier == 1 diff --git a/uv.lock b/uv.lock index 410baaf..9cc0790 100644 --- a/uv.lock +++ b/uv.lock @@ -896,7 +896,7 @@ wheels = [ [[package]] name = "pixivutil-server" -version = "2.4.3" +version = "2.4.4" source = { editable = "." } dependencies = [ { name = "aiofiles" },