Skip to content

Commit c4cb826

Browse files
authored
[CVAT] CVAT webhook queue (#3558)
1 parent 4a244d8 commit c4cb826

File tree

20 files changed

+1121
-354
lines changed

20 files changed

+1121
-354
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""
2+
Add CVAT webhook queue
3+
4+
Revision ID: 216af22b5590
5+
Revises: c32b36a87539
6+
Create Date: 2025-09-09 20:08:23.474046
7+
8+
"""
9+
10+
import sqlalchemy as sa
11+
12+
from alembic import op
13+
14+
# revision identifiers, used by Alembic.
15+
revision = "216af22b5590"
16+
down_revision = "c32b36a87539"
17+
branch_labels = None
18+
depends_on = None
19+
20+
21+
def upgrade() -> None:
22+
# ### commands auto generated by Alembic ###
23+
op.create_table(
24+
"cvat_webhooks",
25+
sa.Column(
26+
"id",
27+
sa.UUID(as_uuid=False),
28+
server_default=sa.text("uuid_generate_v4()"),
29+
nullable=False,
30+
),
31+
sa.Column("cvat_project_id", sa.Integer(), nullable=False),
32+
sa.Column("cvat_task_id", sa.Integer(), nullable=False),
33+
sa.Column("cvat_job_id", sa.Integer(), nullable=False),
34+
sa.Column("status", sa.String(), server_default="pending", nullable=True),
35+
sa.Column("attempts", sa.Integer(), server_default="0", nullable=True),
36+
sa.Column(
37+
"created_at", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=True
38+
),
39+
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
40+
sa.Column(
41+
"wait_until", sa.DateTime(timezone=True), server_default=sa.text("now()"), nullable=True
42+
),
43+
sa.Column("event_type", sa.String(), nullable=False),
44+
sa.Column("event_data", sa.JSON(), nullable=True),
45+
sa.ForeignKeyConstraint(["cvat_job_id"], ["jobs.cvat_id"], ondelete="CASCADE"),
46+
sa.ForeignKeyConstraint(["cvat_project_id"], ["projects.cvat_id"], ondelete="CASCADE"),
47+
sa.ForeignKeyConstraint(["cvat_task_id"], ["tasks.cvat_id"], ondelete="CASCADE"),
48+
sa.PrimaryKeyConstraint("id"),
49+
)
50+
op.create_index(
51+
op.f("ix_cvat_webhooks_cvat_job_id"), "cvat_webhooks", ["cvat_job_id"], unique=False
52+
)
53+
op.create_index(
54+
op.f("ix_cvat_webhooks_cvat_project_id"), "cvat_webhooks", ["cvat_project_id"], unique=False
55+
)
56+
op.create_index(
57+
op.f("ix_cvat_webhooks_cvat_task_id"), "cvat_webhooks", ["cvat_task_id"], unique=False
58+
)
59+
op.create_index(op.f("ix_cvat_webhooks_id"), "cvat_webhooks", ["id"], unique=False)
60+
# ### end Alembic commands ###
61+
62+
63+
def downgrade() -> None:
64+
# ### commands auto generated by Alembic ###
65+
op.drop_index(op.f("ix_cvat_webhooks_id"), table_name="cvat_webhooks")
66+
op.drop_index(op.f("ix_cvat_webhooks_cvat_task_id"), table_name="cvat_webhooks")
67+
op.drop_index(op.f("ix_cvat_webhooks_cvat_project_id"), table_name="cvat_webhooks")
68+
op.drop_index(op.f("ix_cvat_webhooks_cvat_job_id"), table_name="cvat_webhooks")
69+
op.drop_table("cvat_webhooks")
70+
# ### end Alembic commands ###

packages/examples/cvat/exchange-oracle/debug.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ def patched_get_escrow(chain_id: int, escrow_address: str) -> EscrowData:
9898
logger.info(f"DEV: Using local manifest '{manifest_file}' for escrow '{escrow_address}'")
9999
return escrow
100100

101-
with mock.patch.object(EscrowUtils, "get_escrow", patched_get_escrow):
101+
with (
102+
mock.patch.object(EscrowUtils, "get_escrow", patched_get_escrow),
103+
mock.patch("src.chain.escrow.get_token_symbol", return_value="HMT"),
104+
mock.patch("src.chain.web3.get_token_symbol", return_value="HMT"),
105+
):
102106
yield
103107

104108

packages/examples/cvat/exchange-oracle/src/.env.template

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ PROCESS_RECORDING_ORACLE_WEBHOOKS_INT=
4949
PROCESS_RECORDING_ORACLE_WEBHOOKS_CHUNK_SIZE=
5050
PROCESS_REPUTATION_ORACLE_WEBHOOKS_CHUNK_SIZE=
5151
PROCESS_REPUTATION_ORACLE_WEBHOOKS_INT=
52+
PROCESS_CVAT_WEBHOOKS_WORKERS=
53+
PROCESS_CVAT_WEBHOOKS_INT=
54+
PROCESS_CVAT_WEBHOOKS_CHUNK_SIZE=
5255
TRACK_COMPLETED_PROJECTS_INT=
5356
TRACK_COMPLETED_TASKS_INT=
5457
TRACK_COMPLETED_ESCROWS_INT=

packages/examples/cvat/exchange-oracle/src/core/config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,27 +126,42 @@ class CronConfig:
126126
process_job_launcher_webhooks_chunk_size = int(
127127
getenv("PROCESS_JOB_LAUNCHER_WEBHOOKS_CHUNK_SIZE", 5)
128128
)
129+
129130
process_recording_oracle_webhooks_int = int(getenv("PROCESS_RECORDING_ORACLE_WEBHOOKS_INT", 30))
130131
process_recording_oracle_webhooks_chunk_size = int(
131132
getenv("PROCESS_RECORDING_ORACLE_WEBHOOKS_CHUNK_SIZE", 5)
132133
)
134+
133135
process_reputation_oracle_webhooks_chunk_size = int(
134136
getenv("PROCESS_REPUTATION_ORACLE_WEBHOOKS_CHUNK_SIZE", 5)
135137
)
136138
process_reputation_oracle_webhooks_int = int(
137139
getenv("PROCESS_REPUTATION_ORACLE_WEBHOOKS_INT", 5)
138140
)
141+
142+
process_cvat_webhooks_workers = int(getenv("PROCESS_CVAT_WEBHOOKS_WORKERS", 10))
143+
"""
144+
The maximum number of parallel workers. Workers are added lazily, if the existing workers
145+
can't finish in time.
146+
"""
147+
process_cvat_webhooks_int = int(getenv("PROCESS_CVAT_WEBHOOKS_INT", 5))
148+
process_cvat_webhooks_chunk_size = int(getenv("PROCESS_CVAT_WEBHOOKS_CHUNK_SIZE", 10))
149+
139150
track_completed_projects_int = int(getenv("TRACK_COMPLETED_PROJECTS_INT", 30))
140151
track_completed_tasks_int = int(getenv("TRACK_COMPLETED_TASKS_INT", 30))
152+
141153
track_creating_tasks_int = int(getenv("TRACK_CREATING_TASKS_INT", 300))
142154
track_creating_tasks_chunk_size = getenv("TRACK_CREATING_TASKS_CHUNK_SIZE", 5)
155+
143156
track_assignments_int = int(getenv("TRACK_ASSIGNMENTS_INT", 5))
144157
track_assignments_chunk_size = int(getenv("TRACK_ASSIGNMENTS_CHUNK_SIZE", 10))
145158

146159
track_completed_escrows_int = int(getenv("TRACK_COMPLETED_ESCROWS_INT", 60))
147160
track_completed_escrows_chunk_size = int(getenv("TRACK_COMPLETED_ESCROWS_CHUNK_SIZE", 100))
161+
148162
track_escrow_validations_int = int(getenv("TRACK_ESCROW_VALIDATIONS_INT", 60))
149163
track_escrow_validations_chunk_size = int(getenv("TRACK_ESCROW_VALIDATIONS_CHUNK_SIZE", 1))
164+
150165
track_completed_escrows_max_downloading_retries = int(
151166
getenv("TRACK_COMPLETED_ESCROWS_MAX_DOWNLOADING_RETRIES", 10)
152167
)

packages/examples/cvat/exchange-oracle/src/core/types.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ class OracleWebhookStatuses(str, Enum, metaclass=BetterEnumMeta):
7575
failed = "failed"
7676

7777

78+
class CvatWebhookStatuses(str, Enum, metaclass=BetterEnumMeta):
79+
pending = "pending"
80+
completed = "completed"
81+
failed = "failed"
82+
83+
7884
class AssignmentStatuses(str, Enum, metaclass=BetterEnumMeta):
7985
"""
8086
State changes:

packages/examples/cvat/exchange-oracle/src/crons/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from src.core.config import Config
55
from src.crons.cvat.state_trackers import (
6+
process_incoming_cvat_webhooks,
67
track_assignments,
78
track_completed_escrows,
89
track_completed_projects,
@@ -57,6 +58,12 @@ def cron_record():
5758
"interval",
5859
seconds=Config.cron_config.process_reputation_oracle_webhooks_int,
5960
)
61+
scheduler.add_job(
62+
process_incoming_cvat_webhooks,
63+
trigger="interval",
64+
seconds=Config.cron_config.process_cvat_webhooks_int,
65+
max_instances=Config.cron_config.process_cvat_webhooks_workers,
66+
)
6067
scheduler.add_job(
6168
track_completed_projects,
6269
"interval",

packages/examples/cvat/exchange-oracle/src/crons/cvat/state_trackers.py

Lines changed: 115 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from contextlib import suppress
23

34
from sqlalchemy.orm import Session
45

@@ -15,7 +16,30 @@
1516
from src.db import errors as db_errors
1617
from src.db.utils import ForUpdateParams
1718
from src.handlers.completed_escrows import handle_escrows_validations
19+
from src.handlers.cvat_events import cvat_webhook_handler
1820
from src.utils.logging import format_sequence
21+
from src.utils.time import utcnow
22+
23+
24+
@cron_job
25+
def process_incoming_cvat_webhooks(logger: logging.Logger, session: Session) -> None:
26+
webhooks = cvat_service.incoming_webhooks.get_pending_webhooks(
27+
session=session,
28+
limit=CronConfig.process_cvat_webhooks_chunk_size,
29+
for_update=ForUpdateParams(skip_locked=True),
30+
)
31+
32+
for webhook in webhooks:
33+
try:
34+
with session.begin_nested():
35+
cvat_webhook_handler(webhook, session)
36+
cvat_service.incoming_webhooks.handle_webhook_success(
37+
session, webhook_id=webhook.id
38+
)
39+
except Exception as e:
40+
with session.begin_nested():
41+
logger.exception(e)
42+
cvat_service.incoming_webhooks.handle_webhook_fail(session, webhook_id=webhook.id)
1943

2044

2145
@cron_job
@@ -53,18 +77,51 @@ def track_assignments(logger: logging.Logger) -> None:
5377
4. If a project or task state is not "annotation", cancels assignments
5478
"""
5579

80+
def _try_complete_assignment(
81+
session: Session,
82+
assignment: cvat_models.Assignment,
83+
) -> bool:
84+
"""
85+
Checks if we haven't received a notification, but the job might have been completed.
86+
87+
Returns: assignment completed
88+
"""
89+
90+
latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id(
91+
session, assignment.cvat_job_id
92+
)
93+
if not latest_assignment or latest_assignment.id != assignment.id:
94+
return False
95+
96+
try:
97+
cvat_job = cvat_api.get_job(assignment.cvat_job_id)
98+
except cvat_api.exceptions.NotFoundException:
99+
return False
100+
101+
if cvat_job.state != cvat_api.JobStatus.completed:
102+
return False
103+
104+
if not cvat_job.assignee or cvat_job.assignee.id != latest_assignment.user.cvat_id:
105+
return False
106+
107+
logger.info(f"Found completed job #{assignment.cvat_job_id}. Completing the assignment")
108+
cvat_service.complete_assignment(session, assignment.id, completed_at=utcnow())
109+
cvat_api.update_job_assignee(assignment.cvat_job_id, assignee_id=None)
110+
cvat_service.update_job_status(session, assignment.job.id, status=JobStatuses.completed)
111+
cvat_service.touch(session, cvat_models.Job, [assignment.job.id])
112+
return True
113+
56114
def _reset_job_after_assignment(session: Session, assignment: cvat_models.Assignment):
57115
latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id(
58116
session, assignment.cvat_job_id
59117
)
60-
if latest_assignment.id == assignment.id:
118+
if latest_assignment.id != assignment.id:
61119
# Avoid un-assigning if it's not the latest assignment
120+
return
62121

63-
cvat_api.update_job_assignee(
64-
assignment.cvat_job_id, assignee_id=None
65-
) # note that calling it in a loop can take too much time
66-
67-
cvat_service.update_job_status(session, assignment.job.id, status=JobStatuses.new)
122+
cvat_api.update_job_assignee(assignment.cvat_job_id, assignee_id=None)
123+
cvat_service.update_job_status(session, assignment.job.id, status=JobStatuses.new)
124+
cvat_service.touch(session, cvat_models.Job, [assignment.job.id])
68125

69126
with SessionLocal.begin() as session:
70127
assignments = cvat_service.get_unprocessed_expired_assignments(
@@ -74,18 +131,27 @@ def _reset_job_after_assignment(session: Session, assignment: cvat_models.Assign
74131
)
75132

76133
for assignment in assignments:
77-
logger.info(
78-
"Expiring the unfinished assignment {} (user {}, job id {})".format(
79-
assignment.id,
80-
assignment.user_wallet_address,
81-
assignment.cvat_job_id,
134+
with (
135+
session.begin_nested(),
136+
suppress(db_errors.LockNotAvailable),
137+
):
138+
cvat_service.get_jobs_by_cvat_id(
139+
session,
140+
cvat_ids=[assignment.cvat_job_id],
141+
for_update=ForUpdateParams(nowait=True),
82142
)
83-
)
84143

85-
cvat_service.expire_assignment(session, assignment.id)
86-
_reset_job_after_assignment(session, assignment)
144+
if not _try_complete_assignment(session, assignment):
145+
logger.info(
146+
"Expiring the unfinished assignment {} (user {}, job id {})".format(
147+
assignment.id,
148+
assignment.user_wallet_address,
149+
assignment.cvat_job_id,
150+
)
151+
)
87152

88-
cvat_service.touch(session, cvat_models.Job, [a.job.id for a in assignments])
153+
cvat_service.expire_assignment(session, assignment.id)
154+
_reset_job_after_assignment(session, assignment)
89155

90156
with SessionLocal.begin() as session:
91157
assignments = cvat_service.get_unprocessed_cancelled_assignments(
@@ -95,16 +161,24 @@ def _reset_job_after_assignment(session: Session, assignment: cvat_models.Assign
95161
)
96162

97163
for assignment in assignments:
98-
logger.info(
99-
"Finalizing the canceled assignment {} (user {}, job id {})".format(
100-
assignment.id,
101-
assignment.user_wallet_address,
102-
assignment.cvat_job_id,
164+
with (
165+
session.begin_nested(),
166+
suppress(db_errors.LockNotAvailable),
167+
):
168+
cvat_service.get_jobs_by_cvat_id(
169+
session,
170+
cvat_ids=[assignment.cvat_job_id],
171+
for_update=ForUpdateParams(nowait=True),
103172
)
104-
)
105-
_reset_job_after_assignment(session, assignment)
106173

107-
cvat_service.touch(session, cvat_models.Job, [a.job.id for a in assignments])
174+
logger.info(
175+
"Finalizing the canceled assignment {} (user {}, job id {})".format(
176+
assignment.id,
177+
assignment.user_wallet_address,
178+
assignment.cvat_job_id,
179+
)
180+
)
181+
_reset_job_after_assignment(session, assignment)
108182

109183
with SessionLocal.begin() as session:
110184
assignments = cvat_service.get_active_assignments(
@@ -115,19 +189,27 @@ def _reset_job_after_assignment(session: Session, assignment: cvat_models.Assign
115189

116190
for assignment in assignments:
117191
if assignment.job.project.status != ProjectStatuses.annotation:
118-
logger.warning(
119-
"Canceling the unfinished assignment {} (user {}, job id {}) - "
120-
"the project state is not annotation".format(
121-
assignment.id,
122-
assignment.user_wallet_address,
123-
assignment.cvat_job_id,
192+
with (
193+
session.begin_nested(),
194+
suppress(db_errors.LockNotAvailable),
195+
):
196+
cvat_service.get_jobs_by_cvat_id(
197+
session,
198+
cvat_ids=[assignment.cvat_job_id],
199+
for_update=ForUpdateParams(nowait=True),
124200
)
125-
)
126201

127-
cvat_service.cancel_assignment(session, assignment.id)
128-
_reset_job_after_assignment(session, assignment)
202+
logger.warning(
203+
"Canceling the unfinished assignment {} (user {}, job id {}) - "
204+
"the project state is not annotation".format(
205+
assignment.id,
206+
assignment.user_wallet_address,
207+
assignment.cvat_job_id,
208+
)
209+
)
129210

130-
cvat_service.touch(session, cvat_models.Job, [a.job.id for a in assignments])
211+
cvat_service.cancel_assignment(session, assignment.id)
212+
_reset_job_after_assignment(session, assignment)
131213

132214

133215
@cron_job

0 commit comments

Comments
 (0)