Skip to content

Commit e1a98f5

Browse files
committed
Move annotation downloading after successful validation
1 parent 34ae83a commit e1a98f5

File tree

20 files changed

+562
-137
lines changed

20 files changed

+562
-137
lines changed

packages/examples/cvat/exchange-oracle/src/core/annotation_meta.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
from pathlib import Path
2-
31
from pydantic import BaseModel
42

53
ANNOTATION_RESULTS_METAFILE_NAME = "annotation_meta.json"
@@ -9,7 +7,6 @@
97
class JobMeta(BaseModel):
108
job_id: int
119
task_id: int
12-
annotation_filename: Path
1310
annotator_wallet_address: str
1411
assignment_id: str
1512
start_frame: int

packages/examples/cvat/exchange-oracle/src/core/oracle_events.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ class ExchangeOracleEvent_JobFinished(OracleEvent):
4747
pass # escrow is enough for now
4848

4949

50+
class ExchangeOracleEvent_EscrowRecorded(OracleEvent):
51+
pass # escrow is enough for now
52+
53+
5054
class ExchangeOracleEvent_EscrowCleaned(OracleEvent):
5155
pass
5256

@@ -56,10 +60,12 @@ class ReputationOracleEvent_EscrowCompleted(OracleEvent):
5660

5761

5862
_event_type_map = {
63+
# TODO: make sender-dependent
5964
JobLauncherEventTypes.escrow_created: JobLauncherEvent_EscrowCreated,
6065
JobLauncherEventTypes.escrow_canceled: JobLauncherEvent_EscrowCanceled,
6166
RecordingOracleEventTypes.job_completed: RecordingOracleEvent_JobCompleted,
6267
RecordingOracleEventTypes.submission_rejected: RecordingOracleEvent_SubmissionRejected,
68+
ExchangeOracleEventTypes.escrow_recorded: ExchangeOracleEvent_EscrowRecorded,
6369
ExchangeOracleEventTypes.escrow_failed: ExchangeOracleEvent_EscrowFailed,
6470
ExchangeOracleEventTypes.job_finished: ExchangeOracleEvent_JobFinished,
6571
ExchangeOracleEventTypes.escrow_cleaned: ExchangeOracleEvent_EscrowCleaned,
@@ -68,6 +74,7 @@ class ReputationOracleEvent_EscrowCompleted(OracleEvent):
6874

6975

7076
def get_class_for_event_type(event_type: str) -> type[OracleEvent]:
77+
# TODO: make sender-dependent
7178
event_class = next((v for k, v in _event_type_map.items() if k == event_type), None)
7279

7380
if not event_class:

packages/examples/cvat/exchange-oracle/src/core/types.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class ExchangeOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
6565
escrow_failed = "escrow_failed"
6666
job_finished = "job_finished"
6767
escrow_cleaned = "escrow_cleaned"
68+
escrow_recorded = "escrow_recorded"
6869

6970

7071
class JobLauncherEventTypes(str, Enum, metaclass=BetterEnumMeta):
@@ -78,7 +79,6 @@ class RecordingOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
7879

7980

8081
class ReputationOracleEventTypes(str, Enum, metaclass=BetterEnumMeta):
81-
# TODO: rename to ReputationOracleEventType
8282
escrow_completed = "escrow_completed"
8383

8484

packages/examples/cvat/exchange-oracle/src/crons/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
process_outgoing_job_launcher_webhooks,
1717
)
1818
from src.crons.webhooks.recording_oracle import (
19+
process_incoming_recording_oracle_webhook_job_completed,
1920
process_incoming_recording_oracle_webhooks,
2021
process_outgoing_recording_oracle_webhooks,
2122
)
@@ -41,6 +42,11 @@ def cron_record():
4142
"interval",
4243
seconds=Config.cron_config.process_recording_oracle_webhooks_int,
4344
)
45+
scheduler.add_job(
46+
process_incoming_recording_oracle_webhook_job_completed,
47+
"interval",
48+
seconds=Config.cron_config.process_recording_oracle_webhooks_int,
49+
)
4450
scheduler.add_job(
4551
process_outgoing_recording_oracle_webhooks,
4652
"interval",

packages/examples/cvat/exchange-oracle/src/crons/webhooks/recording_oracle.py

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from src.crons._cron_job import cron_job
2020
from src.crons.webhooks._common import handle_webhook, process_outgoing_webhooks
2121
from src.db.utils import ForUpdateParams
22+
from src.handlers.completed_escrows import handle_escrow_export
2223
from src.models.webhook import Webhook
2324

2425

@@ -30,6 +31,29 @@ def process_incoming_recording_oracle_webhooks(logger: logging.Logger, session:
3031
webhooks = oracle_db_service.inbox.get_pending_webhooks(
3132
session,
3233
OracleWebhookTypes.recording_oracle,
34+
event_type_not_in=[RecordingOracleEventTypes.job_completed],
35+
limit=CronConfig.process_recording_oracle_webhooks_chunk_size,
36+
for_update=ForUpdateParams(skip_locked=True),
37+
)
38+
39+
for webhook in webhooks:
40+
with handle_webhook(logger, session, webhook, queue=oracle_db_service.inbox):
41+
handle_recording_oracle_event(webhook, db_session=session, logger=logger)
42+
43+
44+
@cron_job
45+
def process_incoming_recording_oracle_webhook_job_completed(
46+
logger: logging.Logger, session: Session
47+
):
48+
"""
49+
Process incoming oracle webhooks of type job_completed
50+
We do it in a separate job as this is a long operation that should not block
51+
other message handling.
52+
"""
53+
webhooks = oracle_db_service.inbox.get_pending_webhooks(
54+
session,
55+
OracleWebhookTypes.recording_oracle,
56+
event_type_in=[RecordingOracleEventTypes.job_completed],
3357
limit=CronConfig.process_recording_oracle_webhooks_chunk_size,
3458
for_update=ForUpdateParams(skip_locked=True),
3559
)
@@ -70,6 +94,7 @@ def handle_recording_oracle_event(webhook: Webhook, *, db_session: Session, logg
7094
)
7195
return
7296

97+
recorded_project_cvat_ids: set[int] = set()
7398
chunk_size = CronConfig.process_accepted_projects_chunk_size
7499
for ids_chunk in take_by(project_ids, chunk_size):
75100
projects_chunk = cvat_db_service.get_projects_by_cvat_ids(
@@ -89,15 +114,11 @@ def handle_recording_oracle_event(webhook: Webhook, *, db_session: Session, logg
89114
)
90115
return
91116

92-
new_status = ProjectStatuses.recorded
93-
logger.info(
94-
"Changing project status to {} (escrow_address={}, project={})".format(
95-
new_status, webhook.escrow_address, project.cvat_id
96-
)
117+
recorded_project_cvat_ids.add(project.cvat_id)
118+
cvat_db_service.update_project_status(
119+
db_session, project.id, ProjectStatuses.recorded
97120
)
98121

99-
cvat_db_service.update_project_status(db_session, project.id, new_status)
100-
101122
cvat_db_service.touch_final_assignments(
102123
db_session,
103124
cvat_project_ids=[p.cvat_id for p in projects_chunk],
@@ -111,6 +132,27 @@ def handle_recording_oracle_event(webhook: Webhook, *, db_session: Session, logg
111132
status=EscrowValidationStatuses.completed,
112133
)
113134

135+
logger.info(
136+
f"Escrow '{webhook.escrow_address}' is accepted, trying to export annotations..."
137+
)
138+
handle_escrow_export(
139+
logger=logger,
140+
session=db_session,
141+
escrow_address=webhook.escrow_address,
142+
chain_id=webhook.chain_id,
143+
)
144+
145+
if recorded_project_cvat_ids:
146+
# Print it after successful export so that the logs were consistent
147+
# in the case of export error
148+
logger.info(
149+
"Changing project statuses to {} (escrow_address={}, project={})".format(
150+
ProjectStatuses.recorded,
151+
webhook.escrow_address,
152+
sorted(recorded_project_cvat_ids),
153+
)
154+
)
155+
114156
case RecordingOracleEventTypes.submission_rejected:
115157
event = RecordingOracleEvent_SubmissionRejected.model_validate(webhook.event_data)
116158

packages/examples/cvat/exchange-oracle/src/handlers/completed_escrows.py

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
from src.chain.escrow import get_escrow_manifest, validate_escrow
1515
from src.core.annotation_meta import ANNOTATION_RESULTS_METAFILE_NAME, RESULTING_ANNOTATIONS_FILE
1616
from src.core.config import CronConfig, StorageConfig
17-
from src.core.oracle_events import ExchangeOracleEvent_JobFinished
17+
from src.core.oracle_events import (
18+
ExchangeOracleEvent_EscrowRecorded,
19+
ExchangeOracleEvent_JobFinished,
20+
)
1821
from src.core.storage import compose_results_bucket_filename
1922
from src.core.types import EscrowValidationStatuses, OracleWebhookTypes, TaskTypes
2023
from src.db import SessionLocal
@@ -68,9 +71,23 @@ def _export_escrow_annotations(
6871
) -> None:
6972
manifest = parse_manifest(get_escrow_manifest(chain_id, escrow_address))
7073

71-
logger.debug(f"Downloading results for the escrow ({escrow_address=})")
74+
escrow_creation = cvat_service.get_escrow_creation_by_escrow_address(
75+
session,
76+
escrow_address,
77+
chain_id,
78+
active=False,
79+
)
80+
if not escrow_creation:
81+
raise AssertionError(f"Can't find escrow creation for escrow '{escrow_address}'")
7282

7383
jobs = cvat_service.get_jobs_by_escrow_address(session, escrow_address, chain_id)
84+
if len(jobs) != escrow_creation.total_jobs:
85+
raise AssertionError(
86+
f"Unexpected number of jobs fetched for escrow "
87+
f"'{escrow_address}': {len(jobs)}, expected {escrow_creation.total_jobs}"
88+
)
89+
90+
logger.debug(f"Downloading results for the escrow ({escrow_address=})")
7491

7592
annotation_format = CVAT_EXPORT_FORMAT_MAPPING[manifest.annotation.type]
7693
# FUTURE-TODO: probably can be removed in the future since
@@ -79,7 +96,7 @@ def _export_escrow_annotations(
7996

8097
if manifest.annotation.type == TaskTypes.image_skeletons_from_boxes.value:
8198
# we'll have to merge annotations ourselves for skeletons
82-
# might want to make this the only behaviour in the future
99+
# might want to make this the only behavior in the future
83100
project_annotations_file = None
84101
project_images = None
85102
else:
@@ -116,13 +133,13 @@ def _export_escrow_annotations(
116133
manifest=manifest,
117134
project_images=project_images,
118135
)
119-
logger.debug(f"Uploading results for the escrow ({escrow_address=})")
136+
logger.debug(f"Uploading annotations for the escrow ({escrow_address=})")
120137

121138
_upload_annotations(
122139
annotation_files=(
123140
resulting_annotations_file_desc,
124141
*job_annotations.values(),
125-
prepare_annotation_metafile(jobs=jobs, job_annotations=job_annotations),
142+
prepare_annotation_metafile(jobs=jobs),
126143
),
127144
chain_id=chain_id,
128145
escrow_address=escrow_address,
@@ -133,7 +150,7 @@ def _export_escrow_annotations(
133150
escrow_address=escrow_address,
134151
chain_id=chain_id,
135152
type=OracleWebhookTypes.recording_oracle,
136-
event=ExchangeOracleEvent_JobFinished(),
153+
event=ExchangeOracleEvent_EscrowRecorded(),
137154
)
138155

139156
logger.info(
@@ -142,6 +159,38 @@ def _export_escrow_annotations(
142159
)
143160

144161

162+
def _request_escrow_validation(
163+
logger: logging.Logger,
164+
chain_id: int,
165+
escrow_address: str,
166+
escrow_projects: Sequence[Project],
167+
session: Session,
168+
) -> None:
169+
# TODO: lock escrow once there is such a DB object
170+
assert escrow_projects # unused, but must hold a lock
171+
172+
# TODO: maybe upload only current iteration jobs
173+
jobs = cvat_service.get_jobs_by_escrow_address(session, escrow_address, chain_id)
174+
175+
logger.debug(f"Uploading assignment info for the escrow ({escrow_address=})")
176+
177+
_upload_annotations(
178+
annotation_files=(prepare_annotation_metafile(jobs=jobs),),
179+
chain_id=chain_id,
180+
escrow_address=escrow_address,
181+
)
182+
183+
oracle_db_service.outbox.create_webhook(
184+
session,
185+
escrow_address=escrow_address,
186+
chain_id=chain_id,
187+
type=OracleWebhookTypes.recording_oracle,
188+
event=ExchangeOracleEvent_JobFinished(),
189+
)
190+
191+
logger.info(f"The escrow ({escrow_address=}) annotation is finished, " f"requesting validation")
192+
193+
145194
def _upload_annotations(
146195
annotation_files: Sequence[FileDescriptor], chain_id: int, escrow_address: str
147196
) -> None:
@@ -249,7 +298,7 @@ def _handle_escrow_validation(
249298
escrow_projects = cvat_service.get_projects_by_escrow_address(
250299
session, escrow_address, limit=None, for_update=ForUpdateParams(nowait=True)
251300
)
252-
_export_escrow_annotations(logger, chain_id, escrow_address, escrow_projects, session)
301+
_request_escrow_validation(logger, chain_id, escrow_address, escrow_projects, session)
253302

254303

255304
def handle_escrows_validations(logger: logging.Logger) -> None:
@@ -281,3 +330,17 @@ def handle_escrows_validations(logger: logging.Logger) -> None:
281330
increase_attempts=True, # increase attempts always to allow escrow rotation
282331
**update_kwargs,
283332
)
333+
334+
335+
def handle_escrow_export(
336+
logger: logging.Logger,
337+
session: Session,
338+
escrow_address: str,
339+
chain_id: int,
340+
):
341+
validate_escrow(chain_id, escrow_address)
342+
343+
escrow_projects = cvat_service.get_projects_by_escrow_address(
344+
session, escrow_address, limit=None, for_update=ForUpdateParams(nowait=True)
345+
)
346+
_export_escrow_annotations(logger, chain_id, escrow_address, escrow_projects, session)

packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def handle_create_job_event(payload: dict) -> None:
142142
session,
143143
escrow_address=project.escrow_address,
144144
chain_id=project.chain_id,
145+
active=True,
145146
for_update=True,
146147
)
147148

packages/examples/cvat/exchange-oracle/src/handlers/job_export.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ class FileDescriptor:
4343
file: io.RawIOBase | None
4444

4545

46-
def prepare_annotation_metafile(
47-
jobs: list[Job], job_annotations: dict[int, FileDescriptor]
48-
) -> FileDescriptor:
46+
def prepare_annotation_metafile(jobs: list[Job]) -> FileDescriptor:
4947
"""
5048
Prepares a task/project annotation descriptor file with annotator mapping.
5149
"""
@@ -54,7 +52,6 @@ def prepare_annotation_metafile(
5452
jobs=[
5553
JobMeta(
5654
job_id=job.cvat_id,
57-
annotation_filename=job_annotations[job.cvat_id].filename,
5855
annotator_wallet_address=job.latest_assignment.user_wallet_address,
5956
assignment_id=job.latest_assignment.id,
6057
task_id=job.cvat_task_id,

packages/examples/cvat/exchange-oracle/src/services/cvat.py

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -439,32 +439,26 @@ def create_escrow_creation(
439439
return escrow_creation_id
440440

441441

442-
def get_escrow_creation_by_id(
443-
session: Session,
444-
escrow_creation_id: str,
445-
*,
446-
for_update: bool | ForUpdateParams = False,
447-
) -> EscrowCreation | None:
448-
return (
449-
_maybe_for_update(session.query(EscrowCreation), enable=for_update)
450-
.where(EscrowCreation.id == escrow_creation_id, EscrowCreation.finished_at.is_(None))
451-
.first()
452-
)
453-
454-
455442
def get_escrow_creation_by_escrow_address(
456443
session: Session,
457444
escrow_address: str,
458445
chain_id: int,
459446
*,
447+
active: bool | None,
460448
for_update: bool | ForUpdateParams = False,
461449
) -> EscrowCreation | None:
450+
is_active_filter = []
451+
if active is True:
452+
is_active_filter = [EscrowCreation.finished_at.is_(None)]
453+
elif active is False:
454+
is_active_filter = [EscrowCreation.finished_at.is_not(None)]
455+
462456
return (
463457
_maybe_for_update(session.query(EscrowCreation), enable=for_update)
464458
.where(
465459
EscrowCreation.escrow_address == escrow_address,
466460
EscrowCreation.chain_id == chain_id,
467-
EscrowCreation.finished_at.is_(None),
461+
*is_active_filter,
468462
)
469463
.first()
470464
)

0 commit comments

Comments
 (0)