Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions services/data/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
tags=None,
system_tags=None,
last_heartbeat_ts=None,
status=None,
):
self.flow_id = flow_id
self.user_name = user_name
Expand All @@ -55,10 +56,12 @@ def __init__(

self.ts_epoch = ts_epoch
self.last_heartbeat_ts = last_heartbeat_ts
# derived, only present when the row was fetched with the status join
self.status = status

def serialize(self, expanded: bool = False):
if expanded:
return {
body = {
"flow_id": self.flow_id,
"run_number": self.run_number,
"run_id": self.run_id,
Expand All @@ -69,7 +72,7 @@ def serialize(self, expanded: bool = False):
"last_heartbeat_ts": self.last_heartbeat_ts
}
else:
return {
body = {
"flow_id": self.flow_id,
"run_number": get_exposed_run_id(self.run_number, self.run_id),
"user_name": self.user_name,
Expand All @@ -78,6 +81,9 @@ def serialize(self, expanded: bool = False):
"system_tags": self.system_tags,
"last_heartbeat_ts": self.last_heartbeat_ts
}
if self.status is not None:
body["status"] = self.status
return body


class StepRow(object):
Expand Down
92 changes: 88 additions & 4 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
ARTIFACT_TABLE_NAME = os.environ.get("DB_TABLE_NAME_ARTIFACT", "artifact_v3")
DB_SCHEMA_NAME = os.environ.get("DB_SCHEMA_NAME", "public")

# Time before a run with a heartbeat is considered inactive (and thus failed).
# Default 6 minutes (in seconds). Kept in sync with the ui_backend definition so
# that status filtering here agrees with the status the UI reports.
RUN_INACTIVE_CUTOFF_TIME = int(os.environ.get("RUN_INACTIVE_CUTOFF_TIME", 60 * 6))

operator_match = re.compile('([^:]*):([=><]+)$')

# use a ddmmyyy timestamp as the version for triggers
Expand Down Expand Up @@ -583,9 +588,74 @@ class AsyncRunTablePostgres(AsyncPostgresTable):
"user_name", "ts_epoch", "last_heartbeat_ts", "tags", "system_tags"]
primary_keys = ["flow_id", "run_number"]
trigger_keys = primary_keys + ["last_heartbeat_ts"]
select_columns = keys
flow_table_name = AsyncFlowTablePostgres.table_name

# Derived run status (running/completed/failed). This mirrors the definition in
# ui_backend_service so filtering agrees with what the UI shows. The two lateral
# joins below pull the 'end' step's attempt metadata that the status depends on.
# Only used when joins are enabled (i.e. when filtering by status), so the plain
# get_all_runs path stays a simple, join-free query.
joins = [

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

battle-tested query logic from the ui_backend side, and this is gated behind the status query param, so from a performance perspective it should be fine to add to the metadata-service as well.

The only concern is with status=failed&status=running which is in the case of relying on heartbeats, not entirely deterministic.

"""
LEFT JOIN LATERAL (
SELECT
ts_epoch,
(CASE
WHEN pg_typeof(value)='jsonb'::regtype
THEN value::jsonb->>0
ELSE value::text
END)::boolean as value
FROM {metadata_table} as attempt_ok
WHERE
{table_name}.flow_id = attempt_ok.flow_id AND
{table_name}.run_number = attempt_ok.run_number AND
attempt_ok.step_name = 'end' AND
attempt_ok.field_name = 'attempt_ok'
ORDER BY ts_epoch DESC
LIMIT 1
) as end_attempt_ok ON true
""".format(table_name=RUN_TABLE_NAME, metadata_table=METADATA_TABLE_NAME),
"""
LEFT JOIN LATERAL (
SELECT ts_epoch
FROM {metadata_table} as attempt
WHERE
{table_name}.flow_id = attempt.flow_id AND
{table_name}.run_number = attempt.run_number AND
attempt.step_name = 'end' AND
attempt.field_name = 'attempt' AND
end_attempt_ok.value IS FALSE
ORDER BY ts_epoch DESC
LIMIT 1
) as end_attempt ON true
""".format(table_name=RUN_TABLE_NAME, metadata_table=METADATA_TABLE_NAME),
]

join_columns = [
"""
(CASE
WHEN end_attempt IS NOT NULL
AND end_attempt_ok.ts_epoch < end_attempt.ts_epoch
THEN 'running'
WHEN end_attempt_ok IS NOT NULL AND end_attempt_ok.value IS TRUE
THEN 'completed'
WHEN end_attempt_ok IS NOT NULL AND end_attempt_ok.value IS FALSE
THEN 'failed'
WHEN {table_name}.last_heartbeat_ts IS NOT NULL
AND @(extract(epoch from now())-{table_name}.last_heartbeat_ts)<={cutoff}
THEN 'running'
ELSE 'failed'
END) AS status
""".format(table_name=RUN_TABLE_NAME, cutoff=RUN_INACTIVE_CUTOFF_TIME),
]

@property
def select_columns(self):
# qualify with the table name so the run columns stay unambiguous once the
# status joins are in play (the joined relations also expose ts_epoch).
return ["{table_name}.{col} AS {col}".format(table_name=self.table_name, col=k)
for k in self.keys]

async def add_run(self, run: RunRow, fill_heartbeat: bool = False):
dict = {
"flow_id": run.flow_id,
Expand All @@ -603,9 +673,23 @@ async def get_run(self, flow_id: str, run_id: str, expanded: bool = False, cur:
return await self.get_records(filter_dict=filter_dict,
fetch_single=True, expanded=expanded, cur=cur)

async def get_all_runs(self, flow_id: str):
filter_dict = {"flow_id": flow_id}
return await self.get_records(filter_dict=filter_dict)
async def get_all_runs(self, flow_id: str, statuses: List[str] = None,
limit: int = 0, offset: int = 0, order: List[str] = None):
if not statuses:
# plain, join-free path — unchanged behaviour for the common case
return await self.get_records(filter_dict={"flow_id": flow_id},
ordering=order, limit=limit)

conditions = ["flow_id = %s",
"status IN ({})".format(", ".join(["%s"] * len(statuses)))]
values = [flow_id, *statuses]
# status lives in the WHERE, so the limit/offset are applied to the matching
# rows — the filter composes with pagination rather than competing with it.
response, _ = await self.find_records(
conditions=conditions, values=values,
limit=limit, offset=offset, order=order, enable_joins=True
)
return response

async def update_heartbeat(self, flow_id: str, run_id: str):
run_key, run_value = translate_run_key(run_id)
Expand Down
15 changes: 14 additions & 1 deletion services/metadata_service/api/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
handle_exceptions
from services.data.postgres_async_db import AsyncPostgresDB

# Statuses a run can be filtered by, matching the values the status query derives.
SUPPORTED_RUN_STATUSES = {"running", "completed", "failed"}


class RunApi(object):
_run_table = None
Expand Down Expand Up @@ -73,6 +76,11 @@ async def get_all_runs(self, request):
description: "flow_id"
required: true
type: "string"
- name: "status"
in: "query"
description: "filter runs by status (running/completed/failed). repeat for OR."
required: false
type: "string"
produces:
- text/plain
responses:
Expand All @@ -82,7 +90,12 @@ async def get_all_runs(self, request):
description: invalid HTTP Method
"""
flow_name = request.match_info.get("flow_id")
return await self._async_table.get_all_runs(flow_name)
statuses = request.query.getall("status", [])
unsupported = [s for s in statuses if s not in SUPPORTED_RUN_STATUSES]
if unsupported:
return DBResponse(response_code=400,
body="unsupported status filter: %s" % ", ".join(unsupported))
return await self._async_table.get_all_runs(flow_name, statuses=statuses or None)

@format_response
@handle_exceptions
Expand Down
123 changes: 122 additions & 1 deletion services/metadata_service/tests/integration_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@
from .utils import (
cli, db,
assert_api_get_response, assert_api_post_response, compare_partial,
add_flow, add_run, assert_api_patch_response
add_flow, add_run, add_metadata, assert_api_patch_response
)
from services.data.postgres_async_db import RUN_INACTIVE_CUTOFF_TIME
import pytest

pytestmark = [pytest.mark.integration_tests]


async def _run_numbers(cli, flow_id, query=""):
# the metadata service serves json as text/plain, so parse the body ourselves
resp = await cli.get("/flows/{flow_id}/runs{q}".format(flow_id=flow_id, q=query))
assert resp.status == 200
return {r["run_number"] for r in json.loads(await resp.text())}


async def test_run_post(cli, db):
# create flow to add runs for.
Expand Down Expand Up @@ -142,6 +149,120 @@ async def test_runs_get(cli, db):
await assert_api_get_response(cli, "/flows/NonExistentFlow/runs", status=200, data=[])


async def test_runs_get_status_filter(cli, db):
# a run with a fresh heartbeat reads as "running"; one without ever reads as "failed".
# that's enough to exercise filtering without seeding attempt metadata.
_flow = (await add_flow(db, "StatusFlow", "test_user-1", ["a_tag"], ["runtime:test"])).body

_running = (await add_run(db, flow_id=_flow["flow_id"],
last_heartbeat_ts=int(time.time()))).body
_failed = (await add_run(db, flow_id=_flow["flow_id"])).body

flow_id = _flow["flow_id"]
assert await _run_numbers(cli, flow_id, "?status=running") == {_running["run_number"]}
assert await _run_numbers(cli, flow_id, "?status=failed") == {_failed["run_number"]}
# repeated param is an OR over statuses
assert await _run_numbers(cli, flow_id, "?status=running&status=failed") == \
{_running["run_number"], _failed["run_number"]}
# no filter still returns everything
assert await _run_numbers(cli, flow_id) == {_running["run_number"], _failed["run_number"]}


async def test_runs_get_status_filter_completed(cli, db):
# a run whose 'end' step recorded a successful attempt reads as "completed".
# this is the path that actually exercises the attempt-metadata join.
_flow = (await add_flow(db, "DoneFlow", "test_user-1", ["a_tag"], ["runtime:test"])).body

_done = (await add_run(db, flow_id=_flow["flow_id"])).body
await add_metadata(db, flow_id=_done["flow_id"], run_number=_done["run_number"],
step_name="end", task_id=1,
metadata={"field_name": "attempt_ok", "value": "true"})
_bare = (await add_run(db, flow_id=_flow["flow_id"])).body

flow_id = _flow["flow_id"]
assert await _run_numbers(cli, flow_id, "?status=completed") == {_done["run_number"]}
assert await _run_numbers(cli, flow_id, "?status=failed") == {_bare["run_number"]}


async def test_runs_get_status_filter_rejects_unknown(cli, db):
_flow = (await add_flow(db, "BadFlow", "test_user-1", ["a_tag"], ["runtime:test"])).body
await add_run(db, flow_id=_flow["flow_id"])

resp = await cli.get("/flows/{flow_id}/runs?status=bogus".format(**_flow))
assert resp.status == 400


async def test_runs_get_status_filter_classification(cli, db):
# one run per branch of the status expression, asserted as three disjoint buckets.
_flow = (await add_flow(db, "MatrixFlow", "test_user-1", ["a_tag"], ["runtime:test"])).body
now = int(time.time())

async def end_attempt_ok(run, ok):
await add_metadata(db, flow_id=run["flow_id"], run_number=run["run_number"],
step_name="end", task_id=1,
metadata={"field_name": "attempt_ok", "value": str(ok)})

# a successful end attempt wins even over a fresh heartbeat
_completed = (await add_run(db, flow_id=_flow["flow_id"], last_heartbeat_ts=now)).body
await end_attempt_ok(_completed, True)
# an explicitly unsuccessful end attempt
_failed_attempt = (await add_run(db, flow_id=_flow["flow_id"])).body
await end_attempt_ok(_failed_attempt, False)
# a heartbeat too old to still count as running
_stale = (await add_run(db, flow_id=_flow["flow_id"],
last_heartbeat_ts=now - RUN_INACTIVE_CUTOFF_TIME - 60)).body
# a recent heartbeat with no end attempt
_running = (await add_run(db, flow_id=_flow["flow_id"], last_heartbeat_ts=now)).body

flow_id = _flow["flow_id"]
assert await _run_numbers(cli, flow_id, "?status=completed") == {_completed["run_number"]}
assert await _run_numbers(cli, flow_id, "?status=running") == {_running["run_number"]}
assert await _run_numbers(cli, flow_id, "?status=failed") == \
{_failed_attempt["run_number"], _stale["run_number"]}


async def test_runs_get_status_filter_retrying(cli, db):
# the first CASE branch: a run whose 'end' step failed (attempt_ok=false) but has a
# *newer* 'attempt' record is being retried, so it reads as running, not failed.
# The branch turns on a strict ts_epoch comparison between the two metadata rows, so
# stamp their timestamps explicitly rather than trusting insert order.
_flow = (await add_flow(db, "RetryFlow", "test_user-1", ["a_tag"], ["runtime:test"])).body
flow_id = _flow["flow_id"]
_retrying = (await add_run(db, flow_id=flow_id)).body

async def stamp_end_metadata(field_name, value, ts):
md = (await add_metadata(db, flow_id=flow_id, run_number=_retrying["run_number"],
step_name="end", task_id=1,
metadata={"field_name": field_name, "value": value})).body
await db.metadata_table_postgres.update_row(
filter_dict={"id": md["id"]}, update_dict={"ts_epoch": ts})

# end step failed at T=1000, then a new attempt started at T=2000 (the retry)
await stamp_end_metadata("attempt_ok", "false", 1000)
await stamp_end_metadata("attempt", "1", 2000)

assert await _run_numbers(cli, flow_id, "?status=running") == {_retrying["run_number"]}
assert await _run_numbers(cli, flow_id, "?status=failed") == set()


async def test_runs_get_status_filter_composes_with_pagination(cli, db):
# a filter has to ride on top of pagination: a limited page of "failed" runs must
# be the matching rows taken *after* filtering, not a limited slice that then gets
# filtered down.
_flow = (await add_flow(db, "PageFlow", "test_user-1", ["a_tag"], ["runtime:test"])).body

failed = [(await add_run(db, flow_id=_flow["flow_id"])).body for _ in range(3)]
await add_run(db, flow_id=_flow["flow_id"], last_heartbeat_ts=int(time.time())) # running

page = (await db.run_table_postgres.get_all_runs(
_flow["flow_id"], statuses=["failed"], limit=2, order=["run_number DESC"])).body

assert len(page) == 2
assert all(r["status"] == "failed" for r in page)
newest_two = sorted((r["run_number"] for r in failed), reverse=True)[:2]
assert [r["run_number"] for r in page] == newest_two


async def test_run_get(cli, db):
# create flow for test
_flow = (await add_flow(db, "TestFlow", "test_user-1", ["a_tag", "b_tag"], ["runtime:test"])).body
Expand Down
Loading