Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions PixivServer/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import PixivServer.auth
import PixivServer.routers
import PixivServer.routers.database
import PixivServer.routers.dlq
import PixivServer.routers.download_queue
import PixivServer.routers.health
import PixivServer.routers.metadata_queue
Expand Down Expand Up @@ -117,6 +118,11 @@ async def request_metrics_middleware(request: Request, call_next):
prefix="/api/database",
dependencies=auth_dependency,
)
app.include_router(
PixivServer.routers.dlq.router,
prefix="/api/queue/dead-letter",
dependencies=auth_dependency,
)
# app.include_router(
# PixivServer.routers.subscription.router,
# prefix="/api/subscription"
Expand Down
36 changes: 33 additions & 3 deletions PixivServer/config/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,43 @@

from PixivServer.config import rabbitmq

default_exchange = Exchange('pixivutil-exchange', type='direct', durable=True, delivery_mode=2)
LEGACY_MAIN_EXCHANGE_NAME = "pixivutil-exchange"
LEGACY_MAIN_QUEUE_NAME = "pixivutil-queue"

CELERY_QUEUES = (
Queue(name="pixivutil-queue", exchange=default_exchange, routing_key='pixivutil-queue', durable=True),
MAIN_EXCHANGE_NAME = "pixivutil-v1-exchange"
DLX_EXCHANGE_NAME = "pixivutil-v1-dlx"
MAIN_QUEUE_NAME = "pixivutil-v1-queue"
MAIN_ROUTING_KEY = MAIN_QUEUE_NAME
DEAD_LETTER_QUEUE_NAME = "pixivutil-v1-dead-letter"
QUEUE_MAX_PRIORITY = 3

default_exchange = Exchange(MAIN_EXCHANGE_NAME, type='direct', durable=True, delivery_mode=2)
dlx_exchange = Exchange(DLX_EXCHANGE_NAME, type='fanout', durable=True, delivery_mode=2)
main_queue = Queue(
name=MAIN_QUEUE_NAME,
exchange=default_exchange,
routing_key=MAIN_ROUTING_KEY,
durable=True,
queue_arguments={
'x-dead-letter-exchange': DLX_EXCHANGE_NAME,
'x-max-priority': QUEUE_MAX_PRIORITY,
},
)
dead_letter_queue = Queue(
name=DEAD_LETTER_QUEUE_NAME,
exchange=dlx_exchange,
routing_key='',
durable=True,
)

CELERY_QUEUES = (main_queue,)

BROKER_URL = rabbitmq.config.broker_url
CELERY_ACKS_LATE = True
CELERY_TASK_ACKS_LATE = True
CELERY_ACKS_ON_FAILURE_OR_TIMEOUT = False
CELERY_TASK_REJECT_ON_WORKER_LOST = True

# Keep broker-side queue priority behavior observable with a single worker.
# Without this, Celery can reserve multiple low-priority tasks before later high-priority tasks arrive.
CELERYD_PREFETCH_MULTIPLIER = 1
1 change: 1 addition & 0 deletions PixivServer/config/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ class RabbitConfig:

def __init__(self):
self.broker_url = os.getenv("RABBITMQ_BROKER_URL", "amqp://guest:guest@rabbitmq:5672")
self.management_url = os.getenv("RABBITMQ_MANAGEMENT_URL", "http://guest:guest@rabbitmq:15672")

config = RabbitConfig()
1 change: 1 addition & 0 deletions PixivServer/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

# --- Worker queue metrics (periodic) ---
QUEUE_DEPTH = Gauge("pixivutil_queue_depth", "Number of messages pending in the task queue")
DLQ_DEPTH = Gauge("pixivutil_dlq_depth", "Number of messages in the dead letter queue")

# --- Request metrics (per-request via middleware) ---
HTTP_REQUESTS_TOTAL = Counter(
Expand Down
11 changes: 11 additions & 0 deletions PixivServer/models/pixiv_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
Model layer for PixivUtil worker queue processing interface.
"""

from typing import Any, Protocol, cast

from celery.result import AsyncResult
from pixivutil_server_common.models import (
TagMetadataFilterMode,
TagSortOrder,
Expand All @@ -10,6 +13,14 @@
from pydantic import BaseModel


class CeleryTask(Protocol):
def apply_async(self, *, args: list[Any] | None = None, priority: int | None = None, **kwargs: Any) -> AsyncResult: ...


def as_celery_task(task: Any) -> CeleryTask:
return cast(CeleryTask, task)


class DownloadArtworkByIdRequest(BaseModel):
artwork_id: int

Expand Down
10 changes: 9 additions & 1 deletion PixivServer/repository/pixivutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class PixivUtilRepository:

def __init__(self):
self.db_path = pixivutil_config.db_path
self.connection: sqlite3.Connection = None
self.connection: sqlite3.Connection = None # pyright: ignore[reportAttributeAccessIssue] this will be handled during open.

def open(self):
self.connection = sqlite3.connect(self.db_path, timeout=30.0)
Expand All @@ -50,6 +50,7 @@ def get_member_data_by_id(self, member_id: int) -> PixivMemberPortfolio:
Raises:
KeyError: If member with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down Expand Up @@ -155,6 +156,7 @@ def get_all_pixiv_member_ids(self) -> list[int]:
Returns:
List of member IDs. Empty list if no members found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT member_id FROM pixiv_master_member ORDER BY member_id ASC")
Expand All @@ -174,6 +176,7 @@ def get_all_pixiv_image_ids(self) -> list[int]:
Returns:
List of image IDs. Empty list if no images found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT image_id FROM pixiv_master_image ORDER BY image_id ASC")
Expand All @@ -193,6 +196,7 @@ def get_all_pixiv_tags(self) -> list[str]:
Returns:
List of tag IDs. Empty list if no tags found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT tag_id FROM pixiv_master_tag ORDER BY tag_id ASC")
Expand All @@ -212,6 +216,7 @@ def get_all_pixiv_series(self) -> list[str]:
Returns:
List of series IDs. Empty list if no series found.
"""
cursor = None
try:
cursor = self.connection.cursor()
cursor.execute("SELECT series_id FROM pixiv_master_series ORDER BY series_id ASC")
Expand All @@ -231,6 +236,7 @@ def get_tag_info_by_id(self, tag_id: str) -> PixivTagInfo:
Raises:
KeyError: If tag with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down Expand Up @@ -305,6 +311,7 @@ def get_series_info_by_id(self, series_id: str) -> PixivSeriesInfo:
Raises:
KeyError: If series with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down Expand Up @@ -365,6 +372,7 @@ def get_image_data_by_id(self, image_id: int) -> PixivImageComplete:
Raises:
KeyError: If image with the given ID is not found.
"""
cursor = None
try:
cursor = self.connection.cursor()

Expand Down
94 changes: 55 additions & 39 deletions PixivServer/repository/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class SubscriptionRepository:

def __init__(self):
self.db_path = pixivutil_config.db_path
self.connection: sqlite3.Connection = None
self.connection: sqlite3.Connection = None # pyright: ignore[reportAttributeAccessIssue] this will be handled during open.

def open(self):
self.connection = sqlite3.connect(self.db_path)
Expand All @@ -35,57 +35,63 @@ def create_table(self):

def check_member_id_exist(self, member_id: int) -> bool:
result = False
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''SELECT 1 FROM pixiv_server_member_subscription WHERE member_id = ?''',
(member_id, )
)
result = c.fetchone()
result = cursor.fetchone()
except Exception as e:
logger.error(f'Failed to check existence of member ID: {member_id}')
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return result

def select_member_name_by_id(self, member_id: int) -> str:
result: str = None
def select_member_name_by_id(self, member_id: int) -> str | None:
result: str | None = None
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''SELECT name FROM pixiv_server_member_subscription WHERE member_id = ?''',
(member_id, )
)
result = c.fetchone()
result = cursor.fetchone()
except Exception as e:
logger.error(f'Failed to retrieve member name for ID {member_id}.')
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return result

def select_member_subscriptions(self) -> list[tuple[int, str]]:
results: list = None

results: list[tuple[int, str]] = []
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''SELECT member_id, name FROM pixiv_server_member_subscription ORDER BY member_id''',
)
results = c.fetchall()
results = cursor.fetchall()
except Exception as e:
logger.error('Failed to export member subscriptions: ', e)
raise e
finally:
c.close()
if cursor is not None:
cursor.close()

return results

def add_member_subscription(self, member_id: int, member_name: str) -> bool:
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''INSERT OR IGNORE INTO pixiv_server_member_subscription VALUES(?, ?, datetime('now'), datetime('now'))''',
(member_id, member_name, )
)
Expand All @@ -94,13 +100,15 @@ def add_member_subscription(self, member_id: int, member_name: str) -> bool:
logger.error(f'Failed to add member subscription: {member_id}', e)
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return True

def remove_member_subscription(self, member_id: int) -> bool:
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''DELETE FROM pixiv_server_member_subscription WHERE member_id = ?''',
(member_id, )
)
Expand All @@ -109,46 +117,51 @@ def remove_member_subscription(self, member_id: int) -> bool:
logger.error(f"Failed to remove member {member_id} from subscription: ", e)
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return True

def check_tag_name_exist(self, tag_id: str) -> str:
result = False
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''SELECT 1 FROM pixiv_server_tag_subscription WHERE tag_id = ?''',
(tag_id, )
)
result = c.fetchone()
result = cursor.fetchone()
except Exception as e:
logger.error(f"Failed to check existence of tag name: {tag_id}")
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return result

def select_tag_subscriptions(self) -> list[tuple[str]]:
results: list = None

results: list[tuple[str]] = []
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''SELECT tag_id FROM pixiv_server_tag_subscription''',
)
results = c.fetchall()
results = cursor.fetchall()
except Exception as e:
logger.error("Failed to export tag encoded subscriptions: ", e)
raise e
finally:
c.close()
if cursor is not None:
cursor.close()

return results

def add_tag_subscription(self, tag_id: str, bookmark_count: int) -> bool:
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''INSERT INTO pixiv_server_tag_subscription (tag_id, bookmark_count, created_date, last_modified_date)
VALUES(?, ?, datetime('now'), datetime('now'))
ON CONFLICT(tag_id)
Expand All @@ -164,13 +177,15 @@ def add_tag_subscription(self, tag_id: str, bookmark_count: int) -> bool:
logger.error(f'Failed to add encoded tag subscription: {tag_id}', e)
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return True

def remove_tag_subscription(self, tag_id: int) -> bool:
cursor = None
try:
c = self.connection.cursor()
c.execute(
cursor = self.connection.cursor()
cursor.execute(
'''DELETE FROM pixiv_server_tag_subscription WHERE tag_id = ?''',
(tag_id, )
)
Expand All @@ -179,7 +194,8 @@ def remove_tag_subscription(self, tag_id: int) -> bool:
logger.error(f"Failed to remove tag {tag_id} from subscription: ", e)
raise e
finally:
c.close()
if cursor is not None:
cursor.close()
return True

def close(self):
Expand Down
Loading