Skip to content

Commit 54c8509

Browse files
authored
fix: Move workflow history cleanup to periodic task (baserow#5166)
* Move workflow history cleanup to a separate periodic task. * Add tests * Add change log * Add error handling so that one workflow clean-up doesn't crash the entire periodic task. * Keep _mark_failure_for_timed_out_history() inside before_run() * History clean-up logic should use bulk query/deletion * Add tests * Update docstring * Add more tests * Lint fix * move mark_failure_for_timed_out_history into periodic task * Rename clear_old_automation_history to automation_periodic_cleanup * Fixes post rebase * Fix fk reference: should point to original_workflow_id
1 parent 9ff4063 commit 54c8509

9 files changed

Lines changed: 355 additions & 175 deletions

File tree

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ DATABASE_NAME=baserow
7272
# BASEROW_AUTOMATION_WORKFLOW_TIMEOUT_HOURS=
7373
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=
7474
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=
75+
# BASEROW_AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES=
7576
# BASEROW_EXTRA_ALLOWED_HOSTS=
7677
# ADDITIONAL_APPS=
7778
# ADDITIONAL_MODULES=

backend/src/baserow/config/settings/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,9 @@ def __setitem__(self, key, value):
931931
AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES = int(
932932
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES", 200)
933933
)
934+
AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES = int(
935+
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES", 60)
936+
)
934937

935938
TRASH_PAGE_SIZE_LIMIT = 200 # How many trash entries can be requested at once.
936939

backend/src/baserow/contrib/automation/workflows/handler.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from django.contrib.auth.models import AbstractUser
88
from django.core.files.storage import Storage
99
from django.db import transaction
10-
from django.db.models import Q, QuerySet
10+
from django.db.models import OuterRef, Q, QuerySet, Subquery
1111
from django.utils import timezone
1212

1313
from celery.canvas import Signature, chain
@@ -854,40 +854,42 @@ def toggle_test_run(
854854
# except if we are updating the trigger sample data by itself
855855
self.async_start_workflow(workflow)
856856

857-
def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None:
857+
def clear_old_history(self) -> None:
858858
"""
859-
Clear any old history entries related to the workflow.
859+
Clears any old history entries across all workflows.
860860
861-
It will delete any history entries that are older than MAX_HISTORY_DAYS and only
862-
keep the most recent MAX_HISTORY_ENTRIES entries.
863-
864-
TODO: refactor this once https://github.com/baserow/baserow/pull/5166
865-
is merged in.
861+
It will delete any history entries that are older than MAX_HISTORY_DAYS
862+
and only keep the most recent MAX_HISTORY_ENTRIES entries.
866863
"""
867864

865+
# Delete all history entries older than max days
868866
oldest_history_date = timezone.now() - timedelta(
869867
days=settings.AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS
870868
)
871-
original_workflow.workflow_histories.exclude(
869+
AutomationWorkflowHistory.objects.exclude(
872870
status=HistoryStatusChoices.STARTED
873871
).filter(started_on__lt=oldest_history_date).delete()
874872

875-
history_ids_to_keep = list(
876-
original_workflow.workflow_histories.order_by("-started_on").values_list(
877-
"id", flat=True
878-
)[: settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES]
873+
# Delete all history entries older than max entries
874+
max_entries = settings.AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES
875+
cutoff_date = Subquery(
876+
AutomationWorkflowHistory.objects.filter(
877+
original_workflow_id=OuterRef("original_workflow_id")
878+
)
879+
.order_by("-started_on")
880+
# A Subquery must return a single value, so we return
881+
# the started_on of the oldest history entry from the
882+
# latest max_entries entries.
883+
.values("started_on")[max_entries - 1 : max_entries]
879884
)
880-
original_workflow.workflow_histories.exclude(
885+
AutomationWorkflowHistory.objects.exclude(
881886
status=HistoryStatusChoices.STARTED
882-
).exclude(id__in=history_ids_to_keep).delete()
887+
).filter(started_on__lt=cutoff_date).delete()
883888

884889
# Clean up published automations that no longer have any history entries
885-
active_published = self.get_published_workflow(
886-
original_workflow, with_cache=False
887-
)
888890
empty_published = (
889891
Automation.objects.filter(
890-
published_from=original_workflow,
892+
published_from__isnull=False,
891893
)
892894
.exclude(workflows__cloned_workflow_histories__isnull=False)
893895
# _ensure_published_for_run() is called to potentially create a
@@ -898,14 +900,19 @@ def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None:
898900
created_on__gte=timezone.now() - timedelta(seconds=5),
899901
)
900902
)
901-
if active_published:
902-
empty_published = empty_published.exclude(id=active_published.automation_id)
903+
904+
# Exclude any automation that is currently the active published workflow
905+
active_published_ids = list(
906+
AutomationWorkflow.objects.filter(
907+
state=WorkflowState.LIVE,
908+
).values_list("automation_id", flat=True)
909+
)
910+
if active_published_ids:
911+
empty_published = empty_published.exclude(id__in=active_published_ids)
903912

904913
empty_published.delete()
905914

906-
def _mark_failure_for_timed_out_history(
907-
self, original_workflow: AutomationWorkflow
908-
) -> None:
915+
def mark_failure_for_timed_out_history(self) -> None:
909916
"""
910917
If an history entry is still not finished after a certain duration, this execution
911918
is marked as failed.
@@ -919,7 +926,7 @@ def _mark_failure_for_timed_out_history(
919926
error = "This workflow took too long and was timed out."
920927

921928
workflow_history_ids = list(
922-
original_workflow.workflow_histories.filter(
929+
AutomationWorkflowHistory.objects.filter(
923930
status=HistoryStatusChoices.STARTED,
924931
started_on__lt=max_history_date,
925932
).values_list("id", flat=True)
@@ -1107,13 +1114,6 @@ def before_run(self, workflow: AutomationWorkflow) -> None:
11071114
# another execution
11081115
self.reset_workflow_temporary_states(original_workflow)
11091116

1110-
# If we have history entries that are too old it probably means something
1111-
# went wrong with Celery so we mark these entries as failed.
1112-
self._mark_failure_for_timed_out_history(original_workflow)
1113-
1114-
# We remove old history entries to avoid storing too many entries.
1115-
self._clear_old_history(original_workflow)
1116-
11171117
self._check_too_many_errors(workflow)
11181118

11191119
self._check_is_rate_limited(workflow)

backend/src/baserow/contrib/automation/workflows/tasks.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from datetime import timedelta
12
from typing import Optional
23

4+
from django.conf import settings
35
from django.utils import timezone
46

57
from celery.canvas import Signature
@@ -74,3 +76,22 @@ def handle_workflow_dispatch_done(
7476
sender=None,
7577
workflow_history=history,
7678
)
79+
80+
81+
@app.task(queue="automation_workflow")
82+
def automation_periodic_cleanup():
83+
from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler
84+
85+
handler = AutomationWorkflowHandler()
86+
handler.mark_failure_for_timed_out_history()
87+
handler.clear_old_history()
88+
89+
90+
@app.on_after_finalize.connect
91+
def setup_periodic_automation_tasks(sender, **kwargs):
92+
sender.add_periodic_task(
93+
timedelta(
94+
minutes=settings.AUTOMATION_WORKFLOW_HISTORY_CLEANUP_INTERVAL_MINUTES
95+
),
96+
automation_periodic_cleanup.s(),
97+
)

backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py

Lines changed: 10 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,80 +1403,6 @@ def test_toggle_simulate_mode_on_immediate(
14031403
mock_async_start_workflow.assert_called_once()
14041404

14051405

1406-
@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=7)
1407-
@pytest.mark.django_db
1408-
def test_clear_old_history_deletes_history_older_than_max_days(data_fixture):
1409-
workflow = data_fixture.create_automation_workflow()
1410-
1411-
with freeze_time("2025-02-01 12:00:00"):
1412-
old_history = data_fixture.create_automation_workflow_history(
1413-
workflow=workflow,
1414-
status=HistoryStatusChoices.SUCCESS,
1415-
)
1416-
1417-
with freeze_time("2025-02-02 12:00:00"):
1418-
recent_history = data_fixture.create_automation_workflow_history(
1419-
workflow=workflow,
1420-
status=HistoryStatusChoices.SUCCESS,
1421-
)
1422-
1423-
# This is 8 days after old_history was created, so it should be deleted.
1424-
with freeze_time("2025-02-09 12:00:00"):
1425-
AutomationWorkflowHandler()._clear_old_history(workflow)
1426-
1427-
assert workflow.workflow_histories.filter(id=old_history.id).exists() is False
1428-
assert workflow.workflow_histories.filter(id=recent_history.id).exists() is True
1429-
1430-
1431-
@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=3)
1432-
@pytest.mark.django_db
1433-
def test_clear_old_history_keeps_only_max_entries(data_fixture):
1434-
workflow = data_fixture.create_automation_workflow()
1435-
1436-
histories = []
1437-
day = 10
1438-
for i in range(5):
1439-
day += i
1440-
with freeze_time(f"2025-02-{day} 12:00:00"):
1441-
histories.append(
1442-
data_fixture.create_automation_workflow_history(
1443-
workflow=workflow,
1444-
status=HistoryStatusChoices.SUCCESS,
1445-
)
1446-
)
1447-
1448-
with freeze_time(f"2025-02-16 12:00:00"):
1449-
AutomationWorkflowHandler()._clear_old_history(workflow)
1450-
1451-
assert workflow.workflow_histories.all().count() == 3
1452-
1453-
# The two oldest should be deleted
1454-
for history in histories[:2]:
1455-
assert workflow.workflow_histories.filter(id=history.id).exists() is False
1456-
1457-
# The three newest should be kept
1458-
for history in histories[2:]:
1459-
assert workflow.workflow_histories.filter(id=history.id).exists() is True
1460-
1461-
1462-
@override_settings(
1463-
AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=3,
1464-
AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=1,
1465-
)
1466-
@pytest.mark.django_db
1467-
def test_clear_old_history_keeps_entries(data_fixture):
1468-
workflow = data_fixture.create_automation_workflow()
1469-
1470-
with freeze_time("2025-02-01 12:00:00"):
1471-
history = data_fixture.create_automation_workflow_history(workflow=workflow)
1472-
1473-
with freeze_time("2025-02-02 12:00:00"):
1474-
AutomationWorkflowHandler()._clear_old_history(workflow)
1475-
1476-
# history is within limits, so it should be kept
1477-
assert workflow.workflow_histories.filter(id=history.id).exists() is True
1478-
1479-
14801406
@pytest.mark.django_db
14811407
@patch(f"{WORKFLOWS_MODULE}.handler.AutomationWorkflowHandler.before_run")
14821408
@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task")
@@ -1755,28 +1681,23 @@ def test_async_start_workflow_unexpected_error_creates_history(
17551681

17561682
@override_settings(AUTOMATION_WORKFLOW_TIMEOUT_HOURS=1)
17571683
@pytest.mark.django_db
1758-
def test_before_run_marks_timed_out_started_history_as_failed(data_fixture):
1759-
original_workflow = data_fixture.create_automation_workflow()
1760-
published_workflow = data_fixture.create_automation_workflow(
1761-
state=WorkflowState.LIVE
1762-
)
1763-
published_workflow.automation.published_from = original_workflow
1764-
published_workflow.automation.save()
1684+
def test_mark_failure_for_timed_out_history(data_fixture):
1685+
workflow = data_fixture.create_automation_workflow()
17651686

1766-
with freeze_time("2026-03-10 10:00:00"):
1687+
with freeze_time("2026-04-16 12:00:00"):
17671688
timed_out_history = data_fixture.create_automation_workflow_history(
1768-
workflow=original_workflow,
1689+
workflow=workflow,
17691690
status=HistoryStatusChoices.STARTED,
17701691
)
17711692
node_history = AutomationNodeHistory.objects.create(
17721693
workflow_history=timed_out_history,
1773-
node=original_workflow.get_trigger(),
1694+
node=workflow.get_trigger(),
17741695
started_on=timed_out_history.started_on,
17751696
status=HistoryStatusChoices.STARTED,
17761697
)
17771698

1778-
with freeze_time("2026-03-10 12:00:00"):
1779-
AutomationWorkflowHandler().before_run(published_workflow)
1699+
with freeze_time("2026-04-16 13:00:01"):
1700+
AutomationWorkflowHandler().mark_failure_for_timed_out_history()
17801701

17811702
error_message = "This workflow took too long and was timed out."
17821703

@@ -1817,60 +1738,6 @@ def test_async_start_workflow_unknown_exception(
18171738
)
18181739

18191740

1820-
@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=2)
1821-
@pytest.mark.django_db
1822-
def test_clear_old_history_excludes_started_workflows_max_entries(data_fixture):
1823-
workflow = data_fixture.create_automation_workflow()
1824-
1825-
# Create three history entries
1826-
with freeze_time("2026-03-10 12:00:00"):
1827-
started_history = data_fixture.create_automation_workflow_history(
1828-
workflow=workflow, status=HistoryStatusChoices.STARTED
1829-
)
1830-
1831-
with freeze_time("2026-03-10 13:00:00"):
1832-
data_fixture.create_automation_workflow_history(
1833-
workflow=workflow, status=HistoryStatusChoices.SUCCESS
1834-
)
1835-
1836-
with freeze_time("2026-03-10 14:00:00"):
1837-
data_fixture.create_automation_workflow_history(
1838-
workflow=workflow, status=HistoryStatusChoices.SUCCESS
1839-
)
1840-
1841-
# Although max entries is 2 and the oldest history should be deleted,
1842-
# the oldest one is still kept because its status is STARTED.
1843-
with freeze_time("2026-03-10 15:00:00"):
1844-
AutomationWorkflowHandler()._clear_old_history(workflow)
1845-
1846-
assert workflow.workflow_histories.filter(id=started_history.id).exists() is True
1847-
assert workflow.workflow_histories.count() == 3
1848-
1849-
1850-
@override_settings(AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=1)
1851-
@pytest.mark.django_db
1852-
def test_clear_old_history_excludes_started_workflows_max_days(data_fixture):
1853-
workflow = data_fixture.create_automation_workflow()
1854-
1855-
with freeze_time("2026-03-10 12:00:00"):
1856-
history_1 = data_fixture.create_automation_workflow_history(
1857-
workflow=workflow, status=HistoryStatusChoices.STARTED
1858-
)
1859-
1860-
with freeze_time("2026-03-11 12:00:00"):
1861-
history_2 = data_fixture.create_automation_workflow_history(
1862-
workflow=workflow, status=HistoryStatusChoices.SUCCESS
1863-
)
1864-
1865-
# After 2 days, both history entries are older than MAX_DAYS, but since
1866-
# history_1 hasn't finished yet it shouldn't be deleted.
1867-
with freeze_time("2026-03-13 12:00:00"):
1868-
AutomationWorkflowHandler()._clear_old_history(workflow)
1869-
1870-
assert workflow.workflow_histories.filter(id=history_1.id).exists() is True
1871-
assert workflow.workflow_histories.filter(id=history_2.id).exists() is False
1872-
1873-
18741741
@pytest.mark.django_db
18751742
def test_ensure_published_for_run_creates_new_clone(data_fixture):
18761743
workflow = data_fixture.create_automation_workflow()
@@ -2006,14 +1873,14 @@ def test_clear_old_history_deletes_orphaned_automations(data_fixture):
20061873

20071874
# 12 hours later but within 1 day, so history survives
20081875
with freeze_time("2026-04-21 00:00:00"):
2009-
handler._clear_old_history(workflow)
1876+
handler.clear_old_history()
20101877

20111878
assert Automation.objects.filter(id=clone_automation_id).exists()
20121879

20131880
# 2 days later, so history should have been deleted, and the cloned
20141881
# automation should be pruned as well.
20151882
with freeze_time("2026-04-22 12:00:00"):
2016-
handler._clear_old_history(workflow)
1883+
handler.clear_old_history()
20171884

20181885
assert not Automation.objects.filter(id=clone_automation_id).exists()
20191886

@@ -2033,7 +1900,7 @@ def test_clear_old_history_keeps_live_published_automation_when_newer_test_clone
20331900
assert test_clone_workflow.automation_id > published_workflow.automation_id
20341901

20351902
with freeze_time("2026-04-27 12:01:00"):
2036-
handler._clear_old_history(workflow)
1903+
handler.clear_old_history()
20371904

20381905
assert Automation.objects.filter(id=published_workflow.automation_id).exists()
20391906
assert not Automation.objects.filter(id=test_clone_workflow.automation_id).exists()

0 commit comments

Comments
 (0)