Skip to content

Commit a8b0f9b

Browse files
authored
[CVAT] Fix moving projects to validation (#2852)
* Fix a possible deadlock in create_escrow_validations * Improve log message * Don't expect projects in validation status in completed escrows * Use limit from config * USe a separate env var * Polish some code * Mark escrow validation completed after RO rejection webhook * Fix potential deadlock on escrow validation inserts * Satisfy linter * Remove backward compatibility for env var * Update tests, skip ro validation webhooks if no active validations * Add db type check
1 parent efe591b commit a8b0f9b

File tree

11 files changed

+550
-383
lines changed

11 files changed

+550
-383
lines changed

packages/examples/cvat/exchange-oracle/src/.env.template

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ TRACK_COMPLETED_TASKS_INT=
4949
TRACK_COMPLETED_TASKS_CHUNK_SIZE=
5050
TRACK_COMPLETED_ESCROWS_INT=
5151
TRACK_COMPLETED_ESCROWS_CHUNK_SIZE=
52+
TRACK_ESCROW_VALIDATIONS_INT=
53+
TRACK_ESCROW_VALIDATIONS_CHUNK_SIZE=
5254
TRACK_CREATING_TASKS_INT=
5355
TRACK_CREATING_TASKS_CHUNK_SIZE=
5456
TRACK_ASSIGNMENTS_INT=

packages/examples/cvat/exchange-oracle/src/core/config.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,15 +142,12 @@ class CronConfig:
142142
"TRACK_COMPLETED_ESCROWS_INT", os.environ.get("RETRIEVE_ANNOTATIONS_INT", 60)
143143
)
144144
)
145-
track_escrow_validations_int = int(
146-
os.environ.get(
147-
"TRACK_COMPLETED_ESCROWS_INT", os.environ.get("RETRIEVE_ANNOTATIONS_INT", 60)
148-
)
145+
track_completed_escrows_chunk_size = int(
146+
os.environ.get("TRACK_COMPLETED_ESCROWS_CHUNK_SIZE", 100)
149147
)
150-
track_completed_escrows_chunk_size = os.environ.get(
151-
# backward compatibility
152-
"TRACK_COMPLETED_ESCROWS_CHUNK_SIZE",
153-
os.environ.get("RETRIEVE_ANNOTATIONS_CHUNK_SIZE", 5),
148+
track_escrow_validations_int = int(os.environ.get("TRACK_COMPLETED_ESCROWS_INT", 60))
149+
track_escrow_validations_chunk_size = int(
150+
os.environ.get("TRACK_ESCROW_VALIDATIONS_CHUNK_SIZE", 5)
154151
)
155152
track_completed_escrows_max_downloading_retries = int(
156153
os.environ.get("TRACK_COMPLETED_ESCROWS_MAX_DOWNLOADING_RETRIES", 10)

packages/examples/cvat/exchange-oracle/src/crons/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def cron_record():
6969
scheduler.add_job(
7070
track_escrow_validations,
7171
"interval",
72-
seconds=Config.cron_config.track_completed_escrows_int,
72+
seconds=Config.cron_config.track_escrow_validations_int,
7373
)
7474
scheduler.add_job(
7575
track_task_creation,

packages/examples/cvat/exchange-oracle/src/crons/cvat/state_trackers.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,13 @@ def track_assignments(logger: logging.Logger) -> None:
116116

117117
@cron_job
118118
def track_completed_escrows(logger: logging.Logger, session: Session) -> None:
119-
awaiting_validations = cvat_service.create_escrow_validations(session)
120-
if awaiting_validations:
121-
session.commit()
119+
new_validations = cvat_service.create_escrow_validations(
120+
session, limit=CronConfig.track_completed_escrows_chunk_size
121+
)
122+
if new_validations:
122123
logger.info(
123-
f"Got {len(awaiting_validations)} escrows "
124-
f"awaiting validation: {format_sequence(awaiting_validations)}"
124+
f"Got {len(new_validations)} escrows "
125+
f"awaiting validation: {format_sequence([(v[1], v[2]) for v in new_validations])}"
125126
)
126127

127128

packages/examples/cvat/exchange-oracle/src/crons/webhooks/recording_oracle.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,29 @@ def handle_recording_oracle_event(webhook: Webhook, *, db_session: Session, logg
4444

4545
match webhook.event_type:
4646
case RecordingOracleEventTypes.job_completed:
47+
escrow_validation = cvat_db_service.get_escrow_validation_by_escrow_address(
48+
db_session, escrow_address=webhook.escrow_address, chain_id=webhook.chain_id
49+
)
50+
if (
51+
not escrow_validation
52+
or escrow_validation.status != EscrowValidationStatuses.in_progress
53+
):
54+
logger.error(
55+
"Unexpected event {} received for the escrow. "
56+
"There is no active validation now, ignoring (escrow_address={}) ".format(
57+
webhook.event_type,
58+
webhook.escrow_address,
59+
)
60+
)
61+
return
62+
4763
chunk_size = CronConfig.accepted_projects_chunk_size
4864
project_ids = cvat_db_service.get_project_cvat_ids_by_escrow_address(
4965
db_session, webhook.escrow_address
5066
)
5167
if not project_ids:
5268
logger.error(
53-
f"Unexpected event {webhook.event_type} received for an unknown project, "
69+
f"Unexpected event {webhook.event_type} received for an unknown escrow, "
5470
f"ignoring (escrow_address={webhook.escrow_address})"
5571
)
5672
return
@@ -98,6 +114,22 @@ def handle_recording_oracle_event(webhook: Webhook, *, db_session: Session, logg
98114
case RecordingOracleEventTypes.submission_rejected:
99115
event = RecordingOracleEvent_SubmissionRejected.model_validate(webhook.event_data)
100116

117+
escrow_validation = cvat_db_service.get_escrow_validation_by_escrow_address(
118+
db_session, escrow_address=webhook.escrow_address, chain_id=webhook.chain_id
119+
)
120+
if (
121+
not escrow_validation
122+
or escrow_validation.status != EscrowValidationStatuses.in_progress
123+
):
124+
logger.error(
125+
"Unexpected event {} received for the escrow. "
126+
"There is no active validation now, ignoring (escrow_address={}) ".format(
127+
webhook.event_type,
128+
webhook.escrow_address,
129+
)
130+
)
131+
return
132+
101133
rejected_assignments = cvat_db_service.get_assignments_by_id(
102134
db_session, [t.assignment_id for t in event.assignments]
103135
)
@@ -152,6 +184,26 @@ def handle_recording_oracle_event(webhook: Webhook, *, db_session: Session, logg
152184
)
153185
cvat_db_service.update_project_status(db_session, project.id, new_status)
154186

187+
# Mark all the remaining projects completed
188+
cvat_db_service.update_project_statuses_by_escrow_address(
189+
db_session,
190+
escrow_address=webhook.escrow_address,
191+
chain_id=webhook.chain_id,
192+
status=ProjectStatuses.completed,
193+
included_statuses=[ProjectStatuses.validation],
194+
)
195+
196+
# TODO: need to update assignments,
197+
# but there is no special db state for validated assignments
198+
199+
# TODO: maybe delete instead
200+
cvat_db_service.update_escrow_validation(
201+
db_session,
202+
webhook.escrow_address,
203+
webhook.chain_id,
204+
status=EscrowValidationStatuses.completed,
205+
)
206+
155207
case _:
156208
raise AssertionError(f"Unknown recording oracle event {webhook.event_type}")
157209

packages/examples/cvat/exchange-oracle/src/db/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ class BaseUUID(Base):
4040
# At some point it would make sense to use UUID(as_uuid=True).
4141
UUID(as_uuid=False),
4242
primary_key=True,
43-
default=lambda: str(uuid4()),
4443
server_default=func.uuid_generate_v4(),
4544
sort_order=-1, # Make sure it's the first column.
4645
index=True,

packages/examples/cvat/exchange-oracle/src/handlers/completed_escrows.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from src.core.config import CronConfig, StorageConfig
1818
from src.core.oracle_events import ExchangeOracleEvent_JobFinished
1919
from src.core.storage import compose_results_bucket_filename
20-
from src.core.types import EscrowValidationStatuses, OracleWebhookTypes, ProjectStatuses, TaskTypes
20+
from src.core.types import EscrowValidationStatuses, OracleWebhookTypes, TaskTypes
2121
from src.db import SessionLocal
2222
from src.db import errors as db_errors
2323
from src.db.utils import ForUpdateParams
@@ -138,13 +138,6 @@ def _export_escrow_annotations(
138138
event=ExchangeOracleEvent_JobFinished(),
139139
)
140140

141-
cvat_service.update_project_statuses_by_escrow_address(
142-
session,
143-
escrow_address=escrow_address,
144-
chain_id=chain_id,
145-
status=ProjectStatuses.validation,
146-
)
147-
148141
logger.info(
149142
f"The escrow ({escrow_address=}) is completed, "
150143
f"resulting annotations are processed successfully"
@@ -270,8 +263,9 @@ def handle_escrows_validations(logger: logging.Logger) -> None:
270263
with SessionLocal.begin() as session:
271264
escrow_validations = cvat_service.prepare_escrows_for_validation(
272265
session,
273-
limit=CronConfig.track_completed_escrows_chunk_size,
266+
limit=CronConfig.track_escrow_validations_chunk_size,
274267
)
268+
275269
for escrow_address, chain_id in escrow_validations:
276270
with SessionLocal.begin() as session:
277271
# Need to work in separate transactions for each escrow, as a failing DB call
@@ -281,6 +275,7 @@ def handle_escrows_validations(logger: logging.Logger) -> None:
281275
if not handled:
282276
# either escrow is invalid, or we couldn't get lock for projects/validations
283277
continue
278+
284279
# change status so validation won't be attempted again
285280
cvat_service.update_escrow_validation(
286281
session,

packages/examples/cvat/exchange-oracle/src/models/cvat.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class EscrowValidation(BaseUUID):
143143
chain_id = Column(Integer, Enum(Networks), nullable=False)
144144

145145
created_at = Column(DateTime(timezone=True), server_default=func.now())
146-
attempts = Column(Integer, default=0, server_default="0")
146+
attempts = Column(Integer, server_default="0")
147147
status = Column(String, Enum(EscrowValidationStatuses), nullable=False)
148148
projects: Mapped[list[Project]] = relationship(
149149
back_populates="escrow_validation",

0 commit comments

Comments
 (0)