Skip to content

Commit 6e6dc47

Browse files
karajan1001pmrowla
authored andcommitted
Add locks to process operations to prevent race conditions.
fix: #93 Our old solution on #53, can reduce possibility of race condition, but it still happens occasionally. 1. Add locks to write and read operations to prevent this kind of error. 2. Remove old file replacing solution. 3. Use reraise to simplify the code in `__getitem__`
1 parent 22b25a4 commit 6e6dc47

File tree

2 files changed

+14
-9
lines changed

2 files changed

+14
-9
lines changed

src/dvc_task/proc/manager.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@ def __iter__(self) -> Generator[str, None, None]:
4545
for name in os.listdir(self.wdir):
4646
yield name
4747

48+
@reraise(FileNotFoundError, KeyError)
4849
def __getitem__(self, key: str) -> "ProcessInfo":
4950
info_path = self._get_info_path(key)
50-
try:
51-
return ProcessInfo.load(info_path)
52-
except FileNotFoundError as exc:
53-
raise KeyError from exc
51+
return ProcessInfo.load(info_path)
5452

5553
@reraise(FileNotFoundError, KeyError)
5654
def __setitem__(self, key: str, value: "ProcessInfo"):

src/dvc_task/proc/process.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from funcy import cached_property
1313
from shortuuid import uuid
1414

15+
from ..contrib.kombu_filesystem import LOCK_EX, LOCK_SH, lock, unlock
1516
from ..utils import makedirs
1617
from .exceptions import TimeoutExpired
1718

@@ -37,18 +38,24 @@ def from_dict(cls, data: Dict[str, Any]) -> "ProcessInfo":
3738
def load(cls, filename: str) -> "ProcessInfo":
3839
"""Construct the process information from a file."""
3940
with open(filename, "r", encoding="utf-8") as fobj:
40-
return cls.from_dict(json.load(fobj))
41+
lock(fobj, LOCK_SH)
42+
try:
43+
return cls.from_dict(json.load(fobj))
44+
finally:
45+
unlock(fobj)
4146

4247
def asdict(self) -> Dict[str, Any]:
4348
"""Return this info as a dictionary."""
4449
return asdict(self)
4550

4651
def dump(self, filename: str) -> None:
4752
"""Dump the process information into a file."""
48-
temp_info_file = f"{filename}.{uuid()}"
49-
with open(temp_info_file, "w", encoding="utf-8") as fobj:
50-
json.dump(self.asdict(), fobj)
51-
os.replace(temp_info_file, filename)
53+
with open(filename, "w", encoding="utf-8") as fobj:
54+
lock(fobj, LOCK_EX)
55+
try:
56+
json.dump(self.asdict(), fobj)
57+
finally:
58+
unlock(fobj)
5259

5360

5461
class ManagedProcess(AbstractContextManager):

0 commit comments

Comments
 (0)