Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d0a8fc5
update server version to 2.4.4
psilabs-dev Feb 22, 2026
25b51d7
update PixivUtil2 to address https://github.com/psilabs-dev/PixivUtil…
psilabs-dev Feb 22, 2026
7daacca
stop pyright from complaining
psilabs-dev Feb 24, 2026
6c29501
refactor out metrics and add environment configurations
psilabs-dev Feb 24, 2026
3e764dc
update dockerignore to exclude new gui files
psilabs-dev Feb 24, 2026
2eb03e1
update PixivUtil2 to server mode version 0.2.0
psilabs-dev Feb 24, 2026
ef3aa95
refactor server and worker and add dev mode
psilabs-dev Feb 24, 2026
c0e2378
first draft for dead letter queue
psilabs-dev Feb 24, 2026
d8af72e
add tests for celery config
psilabs-dev Feb 24, 2026
ea98fe8
fixes
psilabs-dev Feb 24, 2026
e37b4f8
add integration tests
psilabs-dev Feb 24, 2026
67f82aa
add pixiv-api flag to integration tests
psilabs-dev Feb 24, 2026
3f51000
add dlq client tooling
psilabs-dev Feb 24, 2026
f109dc4
implement priority queue
psilabs-dev Feb 24, 2026
87b1cd9
implement priority and dlq v1 upgrade on worker init
psilabs-dev Feb 25, 2026
21752ea
implement priority queue function to all endpoints and clients
psilabs-dev Feb 25, 2026
711527d
implement dead letter queue logic for prod task endpoints
psilabs-dev Feb 25, 2026
fa640d3
update metrics for DLQ
psilabs-dev Feb 25, 2026
3502c31
clean up stuff
psilabs-dev Feb 25, 2026
fcff832
fix pyright typing for celery
psilabs-dev Feb 25, 2026
a58fd8e
address some pyright issues
psilabs-dev Feb 25, 2026
9813445
address ruff linting
psilabs-dev Feb 25, 2026
21a48cf
dlq logic
psilabs-dev Feb 25, 2026
2efcaf9
include protocol for PixivConfig
psilabs-dev Feb 25, 2026
ee25b03
add dlq documentation
psilabs-dev Feb 25, 2026
00eef6d
disable rabbitmq timeout
psilabs-dev Mar 5, 2026
5e65997
Revert "disable rabbitmq timeout"
psilabs-dev Mar 6, 2026
7cf4b47
remove integration tests
psilabs-dev Mar 8, 2026
fada1bb
remove integration test stuff
psilabs-dev Mar 8, 2026
f9b09f1
also clean up exchange
psilabs-dev Mar 8, 2026
be7606f
add todo
psilabs-dev Mar 8, 2026
3fa9d6f
Merge pull request #48 from psilabs-dev/dev-2.4.4/rabbit-integration
psilabs-dev Mar 8, 2026
42b7efc
add metrics warning logs
psilabs-dev Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ PixivUtil2/.pylintrc
PixivUtil2/.python-version
PixivUtil2/.travis.yml
PixivUtil2/Dockerfile
PixivUtil2/PixivUtilGUI.py
PixivUtil2/ISSUE_TEMPLATE.md
PixivUtil2/MANIFEST.in
PixivUtil2/changelog.txt
PixivUtil2/freeimage-3.15.4-win32.dll
PixivUtil2/icon2.ico
PixivUtil2/pyproject.toml
PixivUtil2/readme.md
PixivUtil2/README.md
PixivUtil2/requirements.txt
PixivUtil2/setup.py
PixivUtil2/uv.lock
Expand Down
141 changes: 19 additions & 122 deletions PixivServer/app.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import asyncio
import base64
import contextlib
import json
import logging
import os
import time
import traceback
import urllib.error
import urllib.request
from contextlib import asynccontextmanager
from pathlib import Path
from urllib.parse import quote, urlparse

import psutil
from fastapi import Depends, FastAPI, Request, Response

import PixivServer
import PixivServer.auth
import PixivServer.routers
import PixivServer.routers.database
import PixivServer.routers.dlq
import PixivServer.routers.download_queue
import PixivServer.routers.health
import PixivServer.routers.metadata_queue
Expand All @@ -28,132 +20,24 @@
# import PixivServer.routers.subscription
import PixivServer.service
import PixivServer.service.pixiv
from PixivServer.config.pixivutil import config as pixivutil_config
from PixivServer.config.rabbitmq import config as rabbitmq_config
from PixivServer.config.server import config as server_config
from PixivServer.metrics import (
DB_ARTWORKS,
DB_MEMBERS,
DB_PAGES,
DB_SERIES,
DB_TAGS,
DISK_DATABASE_BYTES,
DISK_DOWNLOADS_BYTES,
HTTP_REQUEST_DURATION,
HTTP_REQUEST_SIZE,
HTTP_REQUESTS_TOTAL,
HTTP_RESPONSE_SIZE,
QUEUE_DEPTH,
SERVER_INFO,
SYS_CPU_PERCENT,
SYS_DISK_TOTAL_BYTES,
SYS_DISK_USED_BYTES,
SYS_MEM_TOTAL_BYTES,
SYS_MEM_USED_BYTES,
)
from PixivServer.repository.pixivutil import PixivUtilRepository
from PixivServer.service.metrics import periodic_metrics_collector
from PixivServer.utils import get_version

logger = logging.getLogger('uvicorn.pixivutil')

_SYSTEM_COLLECT_INTERVAL = 15 # seconds
_DB_STAT_COLLECT_INTERVAL = 60 # seconds
_DISK_COLLECT_INTERVAL = 300 # seconds (directory walk may be slow on large collections)
_QUEUE_COLLECT_INTERVAL = 15 # seconds


def _collect_system_metrics() -> None:
cpu = psutil.cpu_percent(interval=1)
vm = psutil.virtual_memory()
disk = psutil.disk_usage("/")
SYS_CPU_PERCENT.set(cpu)
SYS_MEM_USED_BYTES.set(vm.used)
SYS_MEM_TOTAL_BYTES.set(vm.total)
SYS_DISK_USED_BYTES.set(disk.used)
SYS_DISK_TOTAL_BYTES.set(disk.total)


def _collect_db_stats() -> None:
repo = PixivUtilRepository()
repo.open()
try:
DB_MEMBERS.set(repo.count_members())
DB_ARTWORKS.set(repo.count_artworks())
DB_PAGES.set(repo.count_pages())
DB_TAGS.set(repo.count_tags())
DB_SERIES.set(repo.count_series())
finally:
repo.close()


def _collect_disk_metrics() -> None:
# Database file + WAL/SHM sidecars
db_path = pixivutil_config.db_path
db_bytes = 0
for suffix in ("", "-wal", "-shm"):
p = db_path + suffix
if os.path.isfile(p):
db_bytes += os.path.getsize(p)
DISK_DATABASE_BYTES.set(db_bytes)

# Downloads directory — recursive file size sum
downloads = Path(PixivServer.service.pixiv.service.downloads_folder)
total = 0
if downloads.is_dir():
for f in downloads.rglob("*"):
if f.is_file():
with contextlib.suppress(OSError):
total += f.stat().st_size
DISK_DOWNLOADS_BYTES.set(total)


def _collect_queue_depth() -> None:
parsed = urlparse(rabbitmq_config.broker_url)
user = parsed.username or "guest"
password = parsed.password or "guest"
host = parsed.hostname or "rabbitmq"
raw_vhost = parsed.path.lstrip("/")
vhost = raw_vhost if raw_vhost else "/"
encoded_vhost = quote(vhost, safe="")
url = f"http://{host}:15672/api/queues/{encoded_vhost}/pixivutil-queue"
credentials = base64.b64encode(f"{user}:{password}".encode()).decode()
req = urllib.request.Request(url, headers={"Authorization": f"Basic {credentials}"})
try:
with urllib.request.urlopen(req, timeout=5) as resp:
data = json.loads(resp.read())
QUEUE_DEPTH.set(data.get("messages", 0))
except (urllib.error.URLError, OSError, ValueError):
pass # Management API unavailable; leave metric stale


async def _periodic_metrics_collector() -> None:
last_system = 0.0
last_db = 0.0
last_disk = 0.0
last_queue = 0.0
while True:
now = time.monotonic()
try:
if now - last_system >= _SYSTEM_COLLECT_INTERVAL:
await asyncio.to_thread(_collect_system_metrics)
last_system = time.monotonic()
if now - last_db >= _DB_STAT_COLLECT_INTERVAL:
await asyncio.to_thread(_collect_db_stats)
last_db = time.monotonic()
if now - last_disk >= _DISK_COLLECT_INTERVAL:
await asyncio.to_thread(_collect_disk_metrics)
last_disk = time.monotonic()
if now - last_queue >= _QUEUE_COLLECT_INTERVAL:
await asyncio.to_thread(_collect_queue_depth)
last_queue = time.monotonic()
except Exception: # noqa: BLE001
logger.warning(f"Metrics collector error: {traceback.format_exc()}")
await asyncio.sleep(1)


@asynccontextmanager
async def lifespan(_: FastAPI):
try:
logger.info("Setting up server.")
logger.info(f"Setting up server in {server_config.server_env} environment.")

# startup actions
await asyncio.sleep(5)
PixivServer.service.pixiv.service.open(validate_pixiv_login=False)
Expand All @@ -162,7 +46,7 @@ async def lifespan(_: FastAPI):
except Exception as e:
print(f"Encountered exception during application setup: {traceback.format_exc()}")
raise e
collector_task = asyncio.create_task(_periodic_metrics_collector())
collector_task = asyncio.create_task(periodic_metrics_collector())
yield
# shutdown actions
collector_task.cancel()
Expand Down Expand Up @@ -234,11 +118,24 @@ async def request_metrics_middleware(request: Request, call_next):
prefix="/api/database",
dependencies=auth_dependency,
)
app.include_router(
PixivServer.routers.dlq.router,
prefix="/api/queue/dead-letter",
dependencies=auth_dependency,
)
# app.include_router(
# PixivServer.routers.subscription.router,
# prefix="/api/subscription"
# )

if server_config.server_env == 'development':
import PixivServer.routers.dev
app.include_router(
PixivServer.routers.dev.router,
prefix="/api/dev",
dependencies=auth_dependency,
)


@app.get("/")
async def info():
Expand Down
36 changes: 33 additions & 3 deletions PixivServer/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,43 @@

from PixivServer.config import rabbitmq

default_exchange = Exchange('pixivutil-exchange', type='direct', durable=True, delivery_mode=2)
LEGACY_MAIN_EXCHANGE_NAME = "pixivutil-exchange"
LEGACY_MAIN_QUEUE_NAME = "pixivutil-queue"

CELERY_QUEUES = (
Queue(name="pixivutil-queue", exchange=default_exchange, routing_key='pixivutil-queue', durable=True),
MAIN_EXCHANGE_NAME = "pixivutil-v1-exchange"
DLX_EXCHANGE_NAME = "pixivutil-v1-dlx"
MAIN_QUEUE_NAME = "pixivutil-v1-queue"
MAIN_ROUTING_KEY = MAIN_QUEUE_NAME
DEAD_LETTER_QUEUE_NAME = "pixivutil-v1-dead-letter"
QUEUE_MAX_PRIORITY = 3

default_exchange = Exchange(MAIN_EXCHANGE_NAME, type='direct', durable=True, delivery_mode=2)
dlx_exchange = Exchange(DLX_EXCHANGE_NAME, type='fanout', durable=True, delivery_mode=2)
main_queue = Queue(
name=MAIN_QUEUE_NAME,
exchange=default_exchange,
routing_key=MAIN_ROUTING_KEY,
durable=True,
queue_arguments={
'x-dead-letter-exchange': DLX_EXCHANGE_NAME,
'x-max-priority': QUEUE_MAX_PRIORITY,
},
)
dead_letter_queue = Queue(
name=DEAD_LETTER_QUEUE_NAME,
exchange=dlx_exchange,
routing_key='',
durable=True,
)

CELERY_QUEUES = (main_queue,)

BROKER_URL = rabbitmq.config.broker_url
CELERY_ACKS_LATE = True
CELERY_TASK_ACKS_LATE = True
CELERY_ACKS_ON_FAILURE_OR_TIMEOUT = False
CELERY_TASK_REJECT_ON_WORKER_LOST = True

# Keep broker-side queue priority behavior observable with a single worker.
# Without this, Celery can reserve multiple low-priority tasks before later high-priority tasks arrive.
CELERYD_PREFETCH_MULTIPLIER = 1
1 change: 1 addition & 0 deletions PixivServer/config/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ class RabbitConfig:

def __init__(self):
self.broker_url = os.getenv("RABBITMQ_BROKER_URL", "amqp://guest:guest@rabbitmq:5672")
self.management_url = os.getenv("RABBITMQ_MANAGEMENT_URL", "http://guest:guest@rabbitmq:15672")

config = RabbitConfig()
6 changes: 6 additions & 0 deletions PixivServer/config/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from typing import Literal


class ServerConfig:
Expand All @@ -8,4 +9,9 @@ def __init__(self):
api_key = os.getenv("PIXIVUTIL_SERVER_API_KEY")
self.api_key = api_key if api_key else None

server_env = os.getenv("PIXIVUTIL_SERVER_ENV", "production")
if server_env != "production" and server_env != "development":
raise ValueError(f"Unrecognized environment: {server_env}")
self.server_env: Literal["production", "development"] = server_env

config = ServerConfig()
1 change: 1 addition & 0 deletions PixivServer/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# --- Worker queue metrics (periodic) ---
QUEUE_DEPTH = Gauge("pixivutil_queue_depth", "Number of messages pending in the task queue")
DLQ_DEPTH = Gauge("pixivutil_dlq_depth", "Number of messages in the dead letter queue")

# --- Request metrics (per-request via middleware) ---
HTTP_REQUESTS_TOTAL = Counter(
Expand Down
11 changes: 11 additions & 0 deletions PixivServer/models/pixiv_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Model layer for PixivUtil worker queue processing interface.
"""

from typing import Any, Protocol, cast

from celery.result import AsyncResult
from pixivutil_server_common.models import (
TagMetadataFilterMode,
TagSortOrder,
Expand All @@ -10,6 +13,14 @@
from pydantic import BaseModel


class CeleryTask(Protocol):
def apply_async(self, *, args: list[Any] | None = None, priority: int | None = None, **kwargs: Any) -> AsyncResult: ...


def as_celery_task(task: Any) -> CeleryTask:
return cast(CeleryTask, task)


class DownloadArtworkByIdRequest(BaseModel):
artwork_id: int

Expand Down
10 changes: 9 additions & 1 deletion PixivServer/repository/pixivutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PixivUtilRepository:

def __init__(self):
self.db_path = pixivutil_config.db_path
self.connection: sqlite3.Connection = None
self.connection: sqlite3.Connection = None # pyright: ignore[reportAttributeAccessIssue] this will be handled during open.

def open(self):
self.connection = sqlite3.connect(self.db_path, timeout=30.0)
Expand All @@ -50,6 +50,7 @@ def get_member_data_by_id(self, member_id: int) -> PixivMemberPortfolio:
Raises:
KeyError: If member with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down Expand Up @@ -155,6 +156,7 @@ def get_all_pixiv_member_ids(self) -> list[int]:
Returns:
List of member IDs. Empty list if no members found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT member_id FROM pixiv_master_member ORDER BY member_id ASC")
Expand All @@ -174,6 +176,7 @@ def get_all_pixiv_image_ids(self) -> list[int]:
Returns:
List of image IDs. Empty list if no images found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT image_id FROM pixiv_master_image ORDER BY image_id ASC")
Expand All @@ -193,6 +196,7 @@ def get_all_pixiv_tags(self) -> list[str]:
Returns:
List of tag IDs. Empty list if no tags found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT tag_id FROM pixiv_master_tag ORDER BY tag_id ASC")
Expand All @@ -212,6 +216,7 @@ def get_all_pixiv_series(self) -> list[str]:
Returns:
List of series IDs. Empty list if no series found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT series_id FROM pixiv_master_series ORDER BY series_id ASC")
Expand All @@ -231,6 +236,7 @@ def get_tag_info_by_id(self, tag_id: str) -> PixivTagInfo:
Raises:
KeyError: If tag with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down Expand Up @@ -305,6 +311,7 @@ def get_series_info_by_id(self, series_id: str) -> PixivSeriesInfo:
Raises:
KeyError: If series with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down Expand Up @@ -365,6 +372,7 @@ def get_image_data_by_id(self, image_id: int) -> PixivImageComplete:
Raises:
KeyError: If image with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down
Loading