Skip to content

Commit af70c33

Browse files
committed
monkey patch for the concurrent problem.
1 parent c82ffa2 commit af70c33

File tree

1 file changed

+39
-28
lines changed

1 file changed

+39
-28
lines changed

src/dvc_task/contrib/kombu_filesystem.py

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
33
Contains classes which need to be backported in kombu <5.3.0 via monkeypatch.
44
"""
5+
56
import os
67
import shutil
78
import tempfile
89
import uuid
910
from collections import namedtuple
10-
from contextlib import contextmanager
1111
from pathlib import Path
1212
from queue import Empty
1313
from time import monotonic
@@ -46,7 +46,7 @@ def unlock(file):
4646
elif os.name == "posix":
4747

4848
import fcntl
49-
from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa
49+
from fcntl import LOCK_EX, LOCK_SH
5050

5151
def lock(file, flags):
5252
"""Create file lock."""
@@ -56,6 +56,11 @@ def unlock(file):
5656
"""Remove file lock."""
5757
fcntl.flock(file.fileno(), fcntl.LOCK_UN)
5858

59+
else:
60+
raise RuntimeError(
61+
"Filesystem plugin only defined for NT and POSIX platforms"
62+
)
63+
5964

6065
exchange_queue_t = namedtuple(
6166
"exchange_queue_t", ["routing_key", "pattern", "queue"]
@@ -67,41 +72,46 @@ class FilesystemChannel(virtual.Channel):
6772

6873
supports_fanout = True
6974

70-
@contextmanager
71-
def _get_exchange_file_obj(self, exchange, mode="rb"):
72-
file = self.control_folder / f"{exchange}.exchange"
73-
if "w" in mode:
74-
self.control_folder.mkdir(exist_ok=True)
75-
f_obj = file.open(mode)
76-
77-
try:
78-
if "w" in mode:
79-
lock(f_obj, LOCK_EX)
80-
yield f_obj
81-
except OSError:
82-
raise ChannelError(f"Cannot open {file}")
83-
finally:
84-
if "w" in mode:
85-
unlock(f_obj)
86-
f_obj.close()
87-
8875
def get_table(self, exchange):
76+
file = self.control_folder / f"{exchange}.exchange"
8977
try:
90-
with self._get_exchange_file_obj(exchange) as f_obj:
78+
f_obj = file.open("r")
79+
try:
80+
lock(f_obj, LOCK_SH)
9181
exchange_table = loads(bytes_to_str(f_obj.read()))
9282
return [exchange_queue_t(*q) for q in exchange_table]
83+
finally:
84+
unlock(f_obj)
85+
f_obj.close()
9386
except FileNotFoundError:
9487
return []
88+
except OSError:
89+
raise ChannelError(f"Cannot open {file}")
9590

9691
def _queue_bind(self, exchange, routing_key, pattern, queue):
97-
queues = self.get_table(exchange)
92+
file = self.control_folder / f"{exchange}.exchange"
93+
self.control_folder.mkdir(exist_ok=True)
9894
queue_val = exchange_queue_t(
9995
routing_key or "", pattern or "", queue or ""
10096
)
101-
if queue_val not in queues:
102-
queues.insert(0, queue_val)
103-
with self._get_exchange_file_obj(exchange, "wb") as f_obj:
104-
f_obj.write(str_to_bytes(dumps(queues)))
97+
try:
98+
if file.exists():
99+
f_obj = file.open("rb+", buffering=0)
100+
lock(f_obj, LOCK_EX)
101+
exchange_table = loads(bytes_to_str(f_obj.read()))
102+
queues = [exchange_queue_t(*q) for q in exchange_table]
103+
if queue_val not in queues:
104+
queues.insert(0, queue_val)
105+
f_obj.seek(0)
106+
f_obj.write(str_to_bytes(dumps(queues)))
107+
else:
108+
f_obj = file.open("wb", buffering=0)
109+
lock(f_obj, LOCK_EX)
110+
queues = [queue_val]
111+
f_obj.write(str_to_bytes(dumps(queues)))
112+
finally:
113+
unlock(f_obj)
114+
f_obj.close()
105115

106116
def _put_fanout(self, exchange, payload, routing_key, **kwargs):
107117
for q in self.get_table(exchange):
@@ -115,7 +125,7 @@ def _put(self, queue, payload, **kwargs):
115125
filename = os.path.join(self.data_folder_out, filename)
116126

117127
try:
118-
f = open(filename, "wb")
128+
f = open(filename, "wb", buffering=0)
119129
lock(f, LOCK_EX)
120130
f.write(str_to_bytes(dumps(payload)))
121131
except OSError:
@@ -148,7 +158,8 @@ def _get(self, queue):
148158
processed_folder,
149159
)
150160
except OSError:
151-
pass # file could be locked, or removed in meantime so ignore
161+
# file could be locked, or removed in meantime so ignore
162+
continue
152163

153164
filename = os.path.join(processed_folder, filename)
154165
try:

0 commit comments

Comments
 (0)