Skip to content

Commit 6439095

Browse files
authored
[CVAT-M2] Add row locks during CVAT task creation (#1848)
* Add row locks in task creation
1 parent 1a730e9 commit 6439095

File tree

1 file changed

+52
-37
lines changed

1 file changed

+52
-37
lines changed

packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,25 +1063,25 @@ def _create_on_cvat(self):
10631063
oracle_bucket = self.oracle_data_bucket
10641064

10651065
# Register cloud storage on CVAT to pass user dataset
1066-
cloud_storage = cvat_api.create_cloudstorage(
1066+
cvat_cloud_storage = cvat_api.create_cloudstorage(
10671067
**_make_cvat_cloud_storage_params(oracle_bucket)
10681068
)
10691069

10701070
# Create a project
1071-
project = cvat_api.create_project(
1071+
cvat_project = cvat_api.create_project(
10721072
self.escrow_address,
10731073
labels=self._label_configuration,
10741074
user_guide=self.manifest.annotation.user_guide,
10751075
)
10761076

10771077
# Setup webhooks for a project (update:task, update:job)
1078-
webhook = cvat_api.create_cvat_webhook(project.id)
1078+
cvat_webhook = cvat_api.create_cvat_webhook(cvat_project.id)
10791079

10801080
with SessionLocal.begin() as session:
1081-
db_service.create_project(
1081+
project_id = db_service.create_project(
10821082
session,
1083-
project.id,
1084-
cloud_storage.id,
1083+
cvat_project.id,
1084+
cvat_cloud_storage.id,
10851085
self.manifest.annotation.type,
10861086
self.escrow_address,
10871087
self.chain_id,
@@ -1090,28 +1090,32 @@ def _create_on_cvat(self):
10901090
bucket_host=input_data_bucket.host_url,
10911091
provider=input_data_bucket.provider,
10921092
),
1093-
cvat_webhook_id=webhook.id,
1093+
cvat_webhook_id=cvat_webhook.id,
10941094
)
1095+
1096+
db_service.get_project_by_id(session, project_id, for_update=True) # lock the row
10951097
db_service.add_project_images(
10961098
session,
1097-
project.id,
1099+
cvat_project.id,
10981100
[
10991101
compose_data_bucket_filename(self.escrow_address, self.chain_id, fn)
11001102
for fn in self._roi_filenames.values()
11011103
],
11021104
)
11031105

11041106
for job_filenames in self._job_layout:
1105-
task = cvat_api.create_task(project.id, self.escrow_address)
1107+
cvat_task = cvat_api.create_task(cvat_project.id, self.escrow_address)
11061108

11071109
with SessionLocal.begin() as session:
1108-
db_service.create_task(session, task.id, project.id, TaskStatuses[task.status])
1110+
task_id = db_service.create_task(
1111+
session, cvat_task.id, cvat_project.id, TaskStatuses[cvat_task.status]
1112+
)
11091113

11101114
# Actual task creation in CVAT takes some time, so it's done in an async process.
11111115
# The task will be created in DB once 'update:task' or 'update:job' webhook is received.
11121116
cvat_api.put_task_data(
1113-
task.id,
1114-
cloud_storage.id,
1117+
cvat_task.id,
1118+
cvat_cloud_storage.id,
11151119
filenames=[
11161120
compose_data_bucket_filename(self.escrow_address, self.chain_id, fn)
11171121
for fn in job_filenames
@@ -1120,7 +1124,8 @@ def _create_on_cvat(self):
11201124
)
11211125

11221126
with SessionLocal.begin() as session:
1123-
db_service.create_data_upload(session, cvat_task_id=task.id)
1127+
db_service.get_task_by_id(session, task_id, for_update=True) # lock the row
1128+
db_service.create_data_upload(session, cvat_task.id)
11241129

11251130
@classmethod
11261131
def _make_cloud_storage_client(cls, bucket_info: BucketAccessInfo) -> StorageClient:
@@ -2060,7 +2065,7 @@ def _create_on_cvat(self):
20602065
oracle_bucket = self.oracle_data_bucket
20612066

20622067
# Register cloud storage on CVAT to pass user dataset
2063-
cloud_storage = cvat_api.create_cloudstorage(
2068+
cvat_cloud_storage = cvat_api.create_cloudstorage(
20642069
**_make_cvat_cloud_storage_params(oracle_bucket)
20652070
)
20662071

@@ -2080,7 +2085,7 @@ def _create_on_cvat(self):
20802085
for point_label_spec in label_specs_by_skeleton[skeleton_label_id]:
20812086
# Create a project for each point label.
20822087
# CVAT doesn't support tasks with different labels in a project.
2083-
project = cvat_api.create_project(
2088+
cvat_project = cvat_api.create_project(
20842089
name="{} ({} {})".format(
20852090
self.escrow_address,
20862091
self.manifest.annotation.labels[skeleton_label_id].name,
@@ -2092,13 +2097,13 @@ def _create_on_cvat(self):
20922097
)
20932098

20942099
# Setup webhooks for a project (update:task, update:job)
2095-
webhook = cvat_api.create_cvat_webhook(project.id)
2100+
cvat_webhook = cvat_api.create_cvat_webhook(cvat_project.id)
20962101

20972102
with SessionLocal.begin() as session:
2098-
db_service.create_project(
2103+
project_id = db_service.create_project(
20992104
session,
2100-
project.id,
2101-
cloud_storage.id,
2105+
cvat_project.id,
2106+
cvat_cloud_storage.id,
21022107
self.manifest.annotation.type,
21032108
self.escrow_address,
21042109
self.chain_id,
@@ -2107,33 +2112,38 @@ def _create_on_cvat(self):
21072112
bucket_host=input_data_bucket.host_url,
21082113
provider=input_data_bucket.provider,
21092114
),
2110-
cvat_webhook_id=webhook.id,
2115+
cvat_webhook_id=cvat_webhook.id,
21112116
)
2117+
2118+
db_service.get_project_by_id(
2119+
session, project_id, for_update=True
2120+
) # lock the row
21122121
db_service.add_project_images(
21132122
session,
2114-
project.id,
2123+
cvat_project.id,
21152124
list(set(chain.from_iterable(skeleton_label_filenames))),
21162125
)
21172126

21182127
for point_label_filenames in skeleton_label_filenames:
2119-
task = cvat_api.create_task(project.id, name=project.name)
2128+
cvat_task = cvat_api.create_task(cvat_project.id, name=cvat_project.name)
21202129

21212130
with SessionLocal.begin() as session:
2122-
db_service.create_task(
2123-
session, task.id, project.id, TaskStatuses[task.status]
2131+
task_id = db_service.create_task(
2132+
session, cvat_task.id, cvat_project.id, TaskStatuses[cvat_task.status]
21242133
)
21252134

21262135
# Actual task creation in CVAT takes some time, so it's done in an async process.
21272136
# The task will be created in DB once 'update:task' or 'update:job' webhook is received.
21282137
cvat_api.put_task_data(
2129-
task.id,
2130-
cloud_storage.id,
2138+
cvat_task.id,
2139+
cvat_cloud_storage.id,
21312140
filenames=point_label_filenames,
21322141
sort_images=False,
21332142
)
21342143

21352144
with SessionLocal.begin() as session:
2136-
db_service.create_data_upload(session, cvat_task_id=task.id)
2145+
db_service.get_task_by_id(session, task_id, for_update=True) # lock the row
2146+
db_service.create_data_upload(session, cvat_task.id)
21372147

21382148
@classmethod
21392149
def _make_cloud_storage_client(cls, bucket_info: BucketAccessInfo) -> StorageClient:
@@ -2293,19 +2303,19 @@ def create_task(escrow_address: str, chain_id: int) -> None:
22932303
cloud_storage = cvat_api.create_cloudstorage(**_make_cvat_cloud_storage_params(data_bucket))
22942304

22952305
# Create a project
2296-
project = cvat_api.create_project(
2306+
cvat_project = cvat_api.create_project(
22972307
escrow_address,
22982308
labels=label_configuration,
22992309
user_guide=manifest.annotation.user_guide,
23002310
)
23012311

23022312
# Setup webhooks for a project (update:task, update:job)
2303-
webhook = cvat_api.create_cvat_webhook(project.id)
2313+
cvat_webhook = cvat_api.create_cvat_webhook(cvat_project.id)
23042314

23052315
with SessionLocal.begin() as session:
2306-
db_service.create_project(
2316+
project_id = db_service.create_project(
23072317
session,
2308-
project.id,
2318+
cvat_project.id,
23092319
cloud_storage.id,
23102320
manifest.annotation.type,
23112321
escrow_address,
@@ -2315,27 +2325,32 @@ def create_task(escrow_address: str, chain_id: int) -> None:
23152325
bucket_host=data_bucket.host_url,
23162326
provider=data_bucket.provider,
23172327
),
2318-
cvat_webhook_id=webhook.id,
2328+
cvat_webhook_id=cvat_webhook.id,
23192329
)
2320-
db_service.add_project_images(session, project.id, data_filenames)
2330+
2331+
db_service.get_project_by_id(session, project_id, for_update=True) # lock the row
2332+
db_service.add_project_images(session, cvat_project.id, data_filenames)
23212333

23222334
for job_filenames in job_configuration:
2323-
task = cvat_api.create_task(project.id, escrow_address)
2335+
cvat_task = cvat_api.create_task(cvat_project.id, escrow_address)
23242336

23252337
with SessionLocal.begin() as session:
2326-
db_service.create_task(session, task.id, project.id, TaskStatuses[task.status])
2338+
task_id = db_service.create_task(
2339+
session, cvat_task.id, cvat_project.id, TaskStatuses[cvat_task.status]
2340+
)
23272341

23282342
# Actual task creation in CVAT takes some time, so it's done in an async process.
23292343
# The task will be created in DB once 'update:task' or 'update:job' webhook is received.
23302344
cvat_api.put_task_data(
2331-
task.id,
2345+
cvat_task.id,
23322346
cloud_storage.id,
23332347
filenames=[os.path.join(data_bucket.path, fn) for fn in job_filenames],
23342348
sort_images=False,
23352349
)
23362350

23372351
with SessionLocal.begin() as session:
2338-
db_service.create_data_upload(session, cvat_task_id=task.id)
2352+
db_service.get_task_by_id(session, task_id, for_update=True) # lock the row
2353+
db_service.create_data_upload(session, cvat_task.id)
23392354

23402355
elif manifest.annotation.type in [TaskTypes.image_boxes_from_points]:
23412356
with BoxesFromPointsTaskBuilder(manifest, escrow_address, chain_id) as task_builder:

0 commit comments

Comments
 (0)