Skip to content

Commit f22f66e

Browse files
committed
Record incoming cvat webhooks into the queue, drop create:job event listening
1 parent feffb94 commit f22f66e

File tree

4 files changed

+99
-146
lines changed

4 files changed

+99
-146
lines changed

packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ class LabelType(str, Enum, metaclass=BetterEnumMeta):
5454

5555
class WebhookEventType(str, Enum, metaclass=BetterEnumMeta):
5656
update_job = "update:job"
57-
create_job = "create:job"
5857
ping = "ping"
5958

6059

@@ -324,8 +323,7 @@ def create_cvat_webhook(project_id: int) -> models.WebhookRead:
324323
# enable_ssl=True,
325324
project_id=project_id,
326325
events=[
327-
models.EventsEnum("update:job"),
328-
models.EventsEnum("create:job"),
326+
models.EventsEnum(WebhookEventType.update_job.value),
329327
],
330328
) # WebhookWriteRequest
331329
try:

packages/examples/cvat/exchange-oracle/src/endpoints/cvat.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from fastapi import APIRouter, Header, Request
44

5-
from src.handlers.cvat_events import cvat_webhook_handler
5+
from src.handlers.cvat_events import cvat_webhook_request_handler
66
from src.schemas.cvat import CvatWebhook
77
from src.validators.signature import validate_cvat_signature
88

@@ -16,4 +16,4 @@ async def receive_cvat_webhook(
1616
x_signature_256: Annotated[str, Header()],
1717
):
1818
await validate_cvat_signature(request, x_signature_256)
19-
cvat_webhook_handler(cvat_webhook)
19+
cvat_webhook_request_handler(cvat_webhook)
Lines changed: 93 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,186 +1,141 @@
11
from dateutil.parser import parse as parse_aware_datetime
2+
from sqlalchemy.orm import Session
23

34
import src.cvat.api_calls as cvat_api
45
import src.models.cvat as models
56
import src.services.cvat as cvat_service
6-
from src import db
7-
from src.core.types import AssignmentStatuses, JobStatuses, ProjectStatuses
7+
from src.core.types import AssignmentStatuses, JobStatuses
88
from src.db import SessionLocal
9-
from src.db import errors as db_errors
9+
from src.db.utils import ForUpdateParams
1010
from src.log import ROOT_LOGGER_NAME
1111
from src.schemas.cvat import CvatWebhook
1212
from src.utils.logging import get_function_logger
1313

1414
module_logger_name = f"{ROOT_LOGGER_NAME}.cron.handler"
1515

1616

17-
def handle_update_job_event(payload: dict) -> None:
17+
def handle_update_job_event(payload: CvatWebhook, session: Session) -> None:
1818
logger = get_function_logger(module_logger_name)
1919

2020
if "state" not in payload.before_update:
2121
return
2222

2323
new_cvat_status = cvat_api.JobStatus(payload.job["state"])
2424

25-
with SessionLocal.begin() as session:
26-
job_id = payload.job["id"]
27-
jobs = cvat_service.get_jobs_by_cvat_id(session, [job_id], for_update=True)
28-
if not jobs:
29-
logger.warning(
30-
f"Received a job update webhook for an unknown job id {job_id}, ignoring "
31-
)
32-
return
33-
34-
job = jobs[0]
25+
job_id = payload.job["id"]
26+
jobs = cvat_service.get_jobs_by_cvat_id(session, [job_id], for_update=True)
27+
if not jobs:
28+
logger.warning(f"Received a job update webhook for an unknown job id {job_id}, ignoring ")
29+
return
3530

36-
if job.status != JobStatuses.in_progress:
37-
logger.warning(
38-
f"Received a job update webhook for a job id {job_id} "
39-
f"in the status {job.status}, ignoring "
40-
)
41-
return
31+
job = jobs[0]
4232

43-
# ignore updates for any assignments except the last one
44-
latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id(
45-
session, job_id, for_update=True
33+
if job.status != JobStatuses.in_progress:
34+
logger.warning(
35+
f"Received a job update webhook for a job id {job_id} "
36+
f"in the status {job.status}, ignoring "
4637
)
47-
if not latest_assignment:
48-
logger.warning(
49-
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
50-
"No assignments for this job, ignoring the update"
51-
)
52-
return
53-
54-
webhook_time = parse_aware_datetime(payload.job["updated_date"])
55-
webhook_assignee_id = (payload.job["assignee"] or {}).get("id")
56-
57-
matching_assignment = next(
58-
(
59-
a
60-
for a in [latest_assignment]
61-
if a.user.cvat_id == webhook_assignee_id
62-
if a.created_at < webhook_time
63-
),
64-
None,
38+
return
39+
40+
# ignore updates for any assignments except the last one
41+
latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id(
42+
session, job_id, for_update=ForUpdateParams(nowait=True)
43+
)
44+
if not latest_assignment:
45+
logger.warning(
46+
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
47+
"No assignments for this job, ignoring the update"
6548
)
49+
return
6650

67-
if not matching_assignment:
51+
webhook_time = parse_aware_datetime(payload.job["updated_date"])
52+
webhook_assignee_id = (payload.job["assignee"] or {}).get("id")
53+
54+
matching_assignment = next(
55+
(
56+
a
57+
for a in [latest_assignment]
58+
if a.user.cvat_id == webhook_assignee_id
59+
if a.created_at < webhook_time
60+
),
61+
None,
62+
)
63+
64+
if not matching_assignment:
65+
logger.warning(
66+
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
67+
"No matching assignment or the assignment is too old, ignoring the update"
68+
)
69+
elif matching_assignment.is_finished:
70+
if matching_assignment.status == AssignmentStatuses.created:
6871
logger.warning(
6972
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
70-
"No matching assignment or the assignment is too old, ignoring the update"
71-
)
72-
elif matching_assignment.is_finished:
73-
if matching_assignment.status == AssignmentStatuses.created:
74-
logger.warning(
75-
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
76-
"Assignment is expired, rejecting the update"
77-
)
78-
cvat_service.expire_assignment(session, matching_assignment.id)
79-
80-
if matching_assignment.id == latest_assignment.id:
81-
cvat_api.update_job_assignee(job.cvat_id, assignee_id=None)
82-
cvat_service.update_job_status(session, job.id, status=JobStatuses.new)
83-
84-
cvat_service.touch(session, models.Job, [job.id])
85-
else:
86-
logger.info(
87-
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
88-
"Assignment is already finished, ignoring the update"
89-
)
90-
elif (
91-
new_cvat_status == cvat_api.JobStatus.completed
92-
and matching_assignment.id == latest_assignment.id
93-
and matching_assignment.is_finished == False
94-
):
95-
logger.info(
96-
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
97-
"Completing the assignment"
98-
)
99-
cvat_service.complete_assignment(
100-
session, matching_assignment.id, completed_at=webhook_time
73+
"Assignment is expired, rejecting the update"
10174
)
102-
cvat_api.update_job_assignee(job.cvat_id, assignee_id=None)
103-
cvat_service.update_job_status(session, job.id, status=JobStatuses.completed)
75+
cvat_service.expire_assignment(session, matching_assignment.id)
76+
77+
if matching_assignment.id == latest_assignment.id:
78+
cvat_api.update_job_assignee(job.cvat_id, assignee_id=None)
79+
cvat_service.update_job_status(session, job.id, status=JobStatuses.new)
80+
10481
cvat_service.touch(session, models.Job, [job.id])
10582
else:
10683
logger.info(
10784
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
108-
"Ignoring the update"
85+
"Assignment is already finished, ignoring the update"
10986
)
87+
elif (
88+
new_cvat_status == cvat_api.JobStatus.completed
89+
and matching_assignment.id == latest_assignment.id
90+
and matching_assignment.is_finished == False
91+
):
92+
logger.info(
93+
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
94+
"Completing the assignment"
95+
)
96+
cvat_service.complete_assignment(session, matching_assignment.id, completed_at=webhook_time)
97+
cvat_api.update_job_assignee(job.cvat_id, assignee_id=None)
98+
cvat_service.update_job_status(session, job.id, status=JobStatuses.completed)
99+
cvat_service.touch(session, models.Job, [job.id])
100+
else:
101+
logger.info(
102+
f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. "
103+
"Ignoring the update"
104+
)
110105

111106

112-
def handle_create_job_event(payload: dict) -> None:
113-
logger = get_function_logger(module_logger_name)
114-
115-
with SessionLocal.begin() as session:
116-
if payload.job["type"] != "annotation":
117-
return
118-
119-
task_id = payload.job["task_id"]
120-
if not cvat_service.get_tasks_by_cvat_id(session, [task_id], for_update=True):
121-
logger.warning(
122-
f"Received a job creation webhook for an unknown task id {task_id}, ignoring "
123-
)
124-
return
125-
126-
jobs = cvat_service.get_jobs_by_cvat_id(session, [payload.job["id"]])
127-
128-
if not jobs:
129-
job_id = cvat_service.create_job(
130-
session,
131-
payload.job["id"],
132-
payload.job["task_id"],
133-
payload.job["project_id"],
134-
status=JobStatuses[payload.job["state"]],
135-
start_frame=payload.job["start_frame"],
136-
stop_frame=payload.job["stop_frame"],
137-
)
138-
cvat_service.touch(session, models.Job, [job_id])
139-
140-
escrow_creation = None
141-
with db.suppress(db_errors.LockNotAvailable):
142-
projects = cvat_service.get_projects_by_cvat_ids(
143-
session, project_cvat_ids=[payload.job["project_id"]], for_update=True
144-
)
145-
if not projects:
146-
return
107+
def cvat_webhook_handler(cvat_webhook: models.CvatWebhook, session: Session) -> None:
108+
parsed_webhook = CvatWebhook.model_validate_json(cvat_webhook.event_data)
109+
match cvat_webhook.event_type:
110+
case cvat_api.WebhookEventType.update_job.value:
111+
handle_update_job_event(parsed_webhook, session)
147112

148-
project = projects[0]
149113

150-
escrow_creation = cvat_service.get_escrow_creation_by_escrow_address(
151-
session,
152-
escrow_address=project.escrow_address,
153-
chain_id=project.chain_id,
154-
active=True,
155-
for_update=True,
156-
)
114+
def handle_update_job_event_request(payload: CvatWebhook) -> None:
115+
if payload.job["type"] != "annotation":
116+
# We're not interested in any other job types so far
117+
return
157118

158-
if not escrow_creation:
159-
return
119+
if "state" not in payload.before_update:
120+
# We're only interested in state updates
121+
return
160122

161-
created_jobs_count = cvat_service.count_jobs_by_escrow_address(
123+
with SessionLocal.begin() as session:
124+
cvat_service.incoming_webhooks.create_webhook(
162125
session,
163-
escrow_address=escrow_creation.escrow_address,
164-
chain_id=escrow_creation.chain_id,
165-
status=JobStatuses.new,
126+
cvat_project_id=payload.job["project_id"], # all oracle jobs have project
127+
cvat_task_id=payload.job["task_id"],
128+
cvat_job_id=payload.job["id"],
129+
event_type=payload.event,
130+
event_data=payload.model_dump_json(indent=None),
166131
)
167132

168-
if created_jobs_count != escrow_creation.total_jobs:
169-
return
170-
171-
cvat_service.update_project_statuses_by_escrow_address(
172-
session=session,
173-
escrow_address=escrow_creation.escrow_address,
174-
chain_id=escrow_creation.chain_id,
175-
status=ProjectStatuses.annotation,
176-
)
133+
# TODO: handle unknown job, task and project ids
177134

178135

179-
def cvat_webhook_handler(cvat_webhook: CvatWebhook) -> None:
136+
def cvat_webhook_request_handler(cvat_webhook: CvatWebhook) -> None:
180137
match cvat_webhook.event:
181138
case cvat_api.WebhookEventType.update_job.value:
182-
handle_update_job_event(cvat_webhook)
183-
case cvat_api.WebhookEventType.create_job.value:
184-
handle_create_job_event(cvat_webhook)
139+
handle_update_job_event_request(cvat_webhook)
185140
case cvat_api.WebhookEventType.ping.value:
186141
pass

packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ def build(self):
383383
user_guide=manifest.annotation.user_guide,
384384
)
385385

386-
# Setup webhooks for a project (update:task, update:job)
386+
# Setup webhooks for the project
387387
cvat_webhook = cvat_api.create_cvat_webhook(cvat_project.id)
388388

389389
with SessionLocal.begin() as session:
@@ -1545,7 +1545,7 @@ def _create_on_cvat(self):
15451545
user_guide=self.manifest.annotation.user_guide,
15461546
)
15471547

1548-
# Setup webhooks for a project (update:task, update:job)
1548+
# Setup webhooks for the project
15491549
cvat_webhook = cvat_api.create_cvat_webhook(cvat_project.id)
15501550

15511551
with SessionLocal.begin() as session:
@@ -2907,7 +2907,7 @@ def _task_params_label_key(ts):
29072907
# TODO: improve guide handling - split for different points
29082908
)
29092909

2910-
# Setup webhooks for a project (update:task, update:job)
2910+
# Setup webhooks for the project
29112911
cvat_webhook = cvat_api.create_cvat_webhook(cvat_project.id)
29122912

29132913
project_id = db_service.create_project(

0 commit comments

Comments
 (0)