Skip to content

Commit 358a100

Browse files
committed
worker: support auto-running fsapp.clean on shutdown
1 parent ecfd4a9 commit 358a100

File tree

2 files changed

+16
-4
lines changed

2 files changed

+16
-4
lines changed

src/dvc_task/worker/temporary.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,27 @@ def __init__( # pylint: disable=too-many-arguments
3636
self.timeout = timeout
3737
self.config = kwargs
3838

39-
def start(self, name: str) -> None:
39+
def start(self, name: str, fsapp_clean: bool = False) -> None:
4040
"""Start the worker if it does not already exist.
4141
4242
Runs the Celery worker main thread in the current process.
4343
4444
Arguments:
4545
name: Celery worker name.
46+
fsapp_clean: Automatically cleanup FSApp broker on shutdown. Has no
47+
effect unless app is an FSApp instance.
4648
"""
4749
if os.name == "nt":
4850
# see https://github.com/celery/billiard/issues/247
4951
os.environ["FORKED_BY_MULTIPROCESSING"] = "1"
5052

5153
if not self.app.control.ping(destination=[name]):
52-
monitor = threading.Thread(target=self.monitor, daemon=True, args=(name,))
54+
monitor = threading.Thread(
55+
target=self.monitor,
56+
daemon=True,
57+
args=(name,),
58+
kwargs={"fsapp_clean": fsapp_clean},
59+
)
5360
monitor.start()
5461
config = dict(self.config)
5562
config["hostname"] = name
@@ -78,7 +85,7 @@ def _parse_config(config: Mapping[str, Any]) -> List[str]:
7885
argv.append("-E")
7986
return argv
8087

81-
def monitor(self, name: str) -> None:
88+
def monitor(self, name: str, fsapp_clean: bool = False) -> None:
8289
"""Monitor the worker and stop it when the queue is empty."""
8390
logger.debug("monitor: waiting for worker to start")
8491
nodename = default_nodename(name)
@@ -108,4 +115,7 @@ def _tasksets(nodes):
108115
logger.info("monitor: shutting down due to empty queue.")
109116
self.app.control.shutdown(destination=[nodename])
110117
break
118+
if fsapp_clean and isinstance(self.app, FSApp):
119+
logger.info("monitor: cleanup FSApp broker.")
120+
self.app.clean()
111121
logger.info("monitor: done")

tests/worker/test_temporary.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ def test_start(celery_app: Celery, mocker: MockerFixture):
2424
assert kwargs["pool"] == TaskPool
2525
assert kwargs["concurrency"] == 1
2626
assert kwargs["prefetch_multiplier"] == 1
27-
thread.assert_called_once_with(target=worker.monitor, daemon=True, args=(name,))
27+
thread.assert_called_once_with(
28+
target=worker.monitor, daemon=True, args=(name,), kwargs={"fsapp_clean": False}
29+
)
2830

2931

3032
@pytest.mark.flaky(

0 commit comments

Comments
 (0)