diff --git a/docs/design/job_architecture.md b/docs/design/job_architecture.md index ae41d08abc..9e4901d441 100644 --- a/docs/design/job_architecture.md +++ b/docs/design/job_architecture.md @@ -315,10 +315,16 @@ Bundled job status information for one or more jobs, keyed by job ID, with the f * `job_id` * `jobState` - see [Job output state](#job-output-state) below for the detailed structure * `outputWidgetInfo` - the parameters to send to output widgets, generated from the app specifications and job output. This is only available for completed jobs and is set to null otherwise. + * `last_checked` - timestamp in ns added at comm sending time -In case of error, the response has instead the keys: +In case of per-input-job-ID error, the response has instead the keys: * `job_id` * `error` - brief message explaining the issue + * `last_checked` - timestamp in ns added at comm sending time + +In case that all valid jobs were filtered out by the `ts` input from the frontend, there will be a job-ID-to-job-state pattern breaking key-value pair keyed by "error" with a state keyed by: + * `error` - "No updated jobs" + * `last_checked` - timestamp in ns added at comm sending time Sample response JSON: ```js @@ -330,11 +336,13 @@ Sample response JSON: "status": "running", "created": 123456789, }, - "outputWidgetInfo": null, // only available for completed jobs + "outputWidgetInfo": null, // only available for completed jobs, + "last_checked": 1652992287210343298, }, "job_id_2": { "job_id": "job_id_2", - "error": "Cannot find job with ID job_id_2" + "error": "Cannot find job with ID job_id_2", + "last_checked": 1652992287210343298, }, } ``` @@ -358,7 +366,6 @@ As sent to browser, includes cell info and run info. The structure below indicat "created": epoch ms, "queued": optional - epoch ms, "finished": optional - epoch ms, - "updated": epoch ms, "terminated_code": optional - int, "error": { // optional "code": int, @@ -371,7 +378,8 @@ As sent to browser, includes cell info and run info. The structure below indicat "tag": string (release, beta, dev), "error_code": optional - int, "errormsg": optional - string, - } + }, + "last_checked": int - ns } ``` diff --git a/docs/testing/HeadlessTesting.md b/docs/testing/HeadlessTesting.md index 08b862ff0f..0e4e9f12d3 100644 --- a/docs/testing/HeadlessTesting.md +++ b/docs/testing/HeadlessTesting.md @@ -161,9 +161,9 @@ job = AppManager().run_app( import time import pprint import json -while job.state()['job_state'] not in ['completed', 'suspend']: +while job.refresh_state()['job_state'] not in ['completed', 'suspend']: time.sleep(5) -job_result = job.state() +job_result = job.refresh_state() if job_result['job_state'] != 'completed': print "Failed - job did not complete: " + ",".join(job_result['status'][0:3]) else: diff --git a/src/biokbase/narrative/jobs/job.py b/src/biokbase/narrative/jobs/job.py index a33bde4680..2dd1a308d7 100644 --- a/src/biokbase/narrative/jobs/job.py +++ b/src/biokbase/narrative/jobs/job.py @@ -9,6 +9,7 @@ import biokbase.narrative.clients as clients from biokbase.narrative.app_util import map_inputs_from_job, map_outputs_from_state from biokbase.narrative.exception_util import transform_job_exception +from biokbase.narrative.jobs.util import time_ns from .specmanager import SpecManager @@ -103,7 +104,7 @@ def __init__(self, ee2_state, extra_data=None, children=None): if ee2_state.get("job_id") is None: raise ValueError("Cannot create a job without a job ID!") - self._update_state(ee2_state) + self.update_state(ee2_state) self.extra_data = extra_data # verify parent-children relationship @@ -129,9 +130,9 @@ def from_job_ids(cls, job_ids, return_list=True): return jobs @staticmethod - def _trim_ee2_state(state: dict, exclude: list) -> None: - if exclude: - for field in exclude: + def _trim_ee2_state(state: dict, exclude_fields: list) -> None: + if exclude_fields: + for field in exclude_fields: if field in state: del state[field] @@ -199,7 +200,7 @@ def __getattr__(self, name): # and need the state refresh. # But KBParallel/KB Batch App jobs may not have the # batch_job field - self.state(force_refresh=True).get( + self.refresh_state(force_refresh=True).get( "child_jobs", JOB_ATTR_DEFAULTS["child_jobs"] ) if self.batch_job @@ -216,7 +217,7 @@ def __getattr__(self, name): # retry_ids field so skip the state refresh self._acc_state.get("retry_ids", JOB_ATTR_DEFAULTS["retry_ids"]) if self.batch_job or self.retry_parent - else self.state(force_refresh=True).get( + else self.refresh_state(force_refresh=True).get( "retry_ids", JOB_ATTR_DEFAULTS["retry_ids"] ) ), @@ -239,17 +240,8 @@ def __getattr__(self, name): return attr[name]() def __setattr__(self, name, value): - if name in STATE_ATTRS: - self._acc_state[name] = value - elif name in JOB_INPUT_ATTRS: - self._acc_state["job_input"] = self._acc_state.get("job_input", {}) - self._acc_state["job_input"][name] = value - elif name in NARR_CELL_INFO_ATTRS: - self._acc_state["job_input"] = self._acc_state.get("job_input", {}) - self._acc_state["job_input"]["narrative_cell_info"] = self._acc_state[ - "job_input" - ].get("narrative_cell_info", {}) - self._acc_state["job_input"]["narrative_cell_info"][name] = value + if name in STATE_ATTRS: # TODO are/should these assignments be used? + self.update_state({name: value}) else: object.__setattr__(self, name, value) @@ -273,14 +265,6 @@ def was_terminal(self): else: return self._acc_state.get("status") in TERMINAL_STATUSES - def is_terminal(self): - self.state() - if self._acc_state.get("batch_job"): - for child_job in self.children: - if child_job._acc_state.get("status") != COMPLETED_STATUS: - child_job.state(force_refresh=True) - return self.was_terminal() - def in_cells(self, cell_ids: List[str]) -> bool: """ For job initialization. @@ -324,9 +308,10 @@ def parameters(self): f"Unable to fetch parameters for job {self.job_id} - {e}" ) - def _update_state(self, state: dict) -> None: + def update_state(self, state: dict, ts: int = None) -> None: """ - given a state data structure (as emitted by ee2), update the stored state in the job object + Given a state data structure (as emitted by ee2), update the stored state in the job object + All updates to the job state should go through here to keep the last_updated field accurate """ if not isinstance(state, dict): raise TypeError("state must be a dict") @@ -335,7 +320,7 @@ def _update_state(self, state: dict) -> None: if self._acc_state: if "job_id" in state and state["job_id"] != self.job_id: raise ValueError( - f"Job ID mismatch in _update_state: job ID: {self.job_id}; state ID: {state['job_id']}" + f"Job ID mismatch in update_state: job ID: {self.job_id}; state ID: {state['job_id']}" ) # Check if there would be no change in updating @@ -348,30 +333,32 @@ def _update_state(self, state: dict) -> None: if self._acc_state is None: self._acc_state = state else: - self._acc_state.update(state) + self._acc_state = {**self._acc_state, **state} + + self.last_updated = time_ns() if ts is None else ts - def state(self, force_refresh=False, exclude=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): + def refresh_state(self, force_refresh=False, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ Queries the job service to see the state of the current job. """ if force_refresh or not self.was_terminal(): state = self.query_ee2_state(self.job_id, init=False) - self._update_state(state) + self.update_state(state) - return self._internal_state(exclude) + return self.cached_state(exclude_fields) - def _internal_state(self, exclude=None): + def cached_state(self, exclude_fields=None): """Wrapper for self._acc_state""" state = copy.deepcopy(self._acc_state) - self._trim_ee2_state(state, exclude) + self._trim_ee2_state(state, exclude_fields) return state def output_state(self, state=None, no_refresh=False) -> dict: """ :param state: Supplied when the state is queried beforehand from EE2 in bulk, or when it is retrieved from a cache. If not supplied, must be - queried with self.state() or self._internal_state() + queried with self.refresh_state() or self.cached_state() :return: dict, with structure { @@ -424,10 +411,10 @@ def output_state(self, state=None, no_refresh=False) -> dict: :rtype: dict """ if not state: - state = self._internal_state() if no_refresh else self.state() + state = self.cached_state() if no_refresh else self.refresh_state() else: - self._update_state(state) - state = self._internal_state() + self.update_state(state) + state = self.cached_state() if state is None: return self._create_error_state( @@ -475,10 +462,10 @@ def show_output_widget(self, state=None): from biokbase.narrative.widgetmanager import WidgetManager if not state: - state = self.state() + state = self.refresh_state() else: - self._update_state(state) - state = self._internal_state() + self.update_state(state) + state = self.cached_state() if state["status"] == COMPLETED_STATUS and "job_output" in state: (output_widget, widget_params) = self._get_output_info(state) @@ -541,7 +528,7 @@ def log(self, first_line=0, num_lines=None): return (num_available_lines, []) return ( num_available_lines, - self._job_logs[first_line : first_line + num_lines], + self._job_logs[first_line: first_line + num_lines], ) def _update_log(self): @@ -611,7 +598,7 @@ def info(self): print(f"Version: {spec['info']['ver']}") try: - state = self.state() + state = self.refresh_state() print(f"Status: {state['status']}") print("Inputs:\n------") pprint(self.params) @@ -631,7 +618,7 @@ def _repr_javascript_(self): """ output_widget_info = None try: - state = self.state() + state = self.refresh_state() spec = self.app_spec() if state.get("status", "") == COMPLETED_STATUS: (output_widget, widget_params) = self._get_output_info(state) diff --git a/src/biokbase/narrative/jobs/jobcomm.py b/src/biokbase/narrative/jobs/jobcomm.py index 9fdfc76627..937493556a 100644 --- a/src/biokbase/narrative/jobs/jobcomm.py +++ b/src/biokbase/narrative/jobs/jobcomm.py @@ -7,7 +7,7 @@ from biokbase.narrative.common import kblogging from biokbase.narrative.exception_util import JobRequestException, NarrativeException from biokbase.narrative.jobs.jobmanager import JobManager -from biokbase.narrative.jobs.util import load_job_constants +from biokbase.narrative.jobs.util import load_job_constants, time_ns (PARAM, MESSAGE_TYPE) = load_job_constants() @@ -116,7 +116,11 @@ def cell_id_list(self): @property def ts(self): - """This param is completely optional""" + """ + Optional field sent with STATUS requests indicating to filter out + job states in the STATUS response that have not been updated since + this epoch time (in ns) + """ return self.rq_data.get(PARAM["TS"]) @@ -199,10 +203,7 @@ def _get_job_ids(self, req: JobRequest) -> List[str]: if req.has_batch_id(): return self._jm.update_batch_job(req.batch_id) - try: - return req.job_id_list - except Exception as ex: - raise JobRequestException(ONE_INPUT_TYPE_ONLY_ERR) from ex + return req.job_id_list def start_job_status_loop( self, @@ -514,6 +515,14 @@ def send_comm_message(self, msg_type: str, content: dict) -> None: Sends a ipykernel.Comm message to the KBaseJobs channel with the given msg_type and content. These just get encoded into the message itself. """ + # For STATUS responses, add a last_checked field + # to each output_state. Note: error states will have + # the last_checked field too + if msg_type == MESSAGE_TYPE["STATUS"]: + now = time_ns() + for output_state in content.values(): + output_state["last_checked"] = now + msg = {"msg_type": msg_type, "content": content} self._comm.send(msg) diff --git a/src/biokbase/narrative/jobs/jobmanager.py b/src/biokbase/narrative/jobs/jobmanager.py index 0c80738f11..6a1bffba93 100644 --- a/src/biokbase/narrative/jobs/jobmanager.py +++ b/src/biokbase/narrative/jobs/jobmanager.py @@ -14,6 +14,7 @@ ) from .job import JOB_INIT_EXCLUDED_JOB_STATE_FIELDS, Job +from .util import time_ns """ KBase Job Manager @@ -29,13 +30,15 @@ __version__ = "0.0.1" JOB_NOT_REG_ERR = "Job ID is not registered" +JOB_NOT_REG_2_ERR = "Cannot find job with ID %s" # TODO unify these JOB_NOT_BATCH_ERR = "Job ID is not for a batch job" JOBS_TYPE_ERR = "List expected for job_id_list" JOBS_MISSING_ERR = "No valid job IDs provided" CELLS_NOT_PROVIDED_ERR = "cell_id_list not provided" -DOES_NOT_EXIST = "does_not_exist" + +NO_UPDATED_JOBS_ERR = "No updated jobs" class JobManager: @@ -307,10 +310,16 @@ def _construct_job_output_state_set( # fill in the output states for the missing jobs # if the job fetch failed, add an error message to the output # and return the cached job state + now = time_ns() for job_id in jobs_to_lookup: job = self.get_job(job_id) if job_id in fetched_states: - output_states[job_id] = job.output_state(fetched_states[job_id]) + fetched_state = fetched_states[job_id] + # pre-emptively try a job state update + # so can mark the set of fetched (but also changed) states + # with a simultaneous timestamp + job.update_state(fetched_state, now) + output_states[job_id] = job.output_state(fetched_state) else: # fetch the current state without updating it output_states[job_id] = job.output_state({}) @@ -322,7 +331,7 @@ def _construct_job_output_state_set( def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: """ Retrieves the job states for the supplied job_ids, with the option to - replace any jobs that have not been updated since ts with a short stub + remove any jobs that have not been updated since ts Jobs that cannot be found in the `_running_jobs` index will return { @@ -340,7 +349,22 @@ def get_job_states(self, job_ids: List[str], ts: int = None) -> dict: """ job_ids, error_ids = self._check_job_list(job_ids) output_states = self._construct_job_output_state_set(job_ids) - return self.add_errors_to_results(output_states, error_ids) + + if ts is not None: + for job_id in job_ids: + if self.get_job(job_id).last_updated < ts: + del output_states[job_id] + no_updated_jobs = ts is not None and job_ids and not output_states + + # add error_ids first in the unlikely case one of the error_ids + # is "error" which is a reserved key which is prioritized + # for indicating an actual error event + self.add_errors_to_results(output_states, error_ids) + + if no_updated_jobs: + output_states["error"] = {"error": NO_UPDATED_JOBS_ERR} + + return output_states def get_all_job_states(self, ignore_refresh_flag=False) -> dict: """ @@ -719,7 +743,7 @@ def add_errors_to_results(self, results: dict, error_ids: List[str]) -> dict: for error_id in error_ids: results[error_id] = { "job_id": error_id, - "error": f"Cannot find job with ID {error_id}", + "error": JOB_NOT_REG_2_ERR % error_id, } return results diff --git a/src/biokbase/narrative/jobs/util.py b/src/biokbase/narrative/jobs/util.py index 8b9c07e77a..af8674b91e 100644 --- a/src/biokbase/narrative/jobs/util.py +++ b/src/biokbase/narrative/jobs/util.py @@ -1,5 +1,7 @@ import json import os +import time + JOB_CONFIG_FILE_PATH_PARTS = [ "kbase-extension", @@ -58,3 +60,8 @@ def load_job_constants(relative_path_to_file=JOB_CONFIG_FILE_PATH_PARTS): ) return (config["params"], config["message_types"]) + + +def time_ns(): + """Simulate time.time_ns() which is only available in python 3.7+""" + return int(time.time() * 1e9) diff --git a/src/biokbase/narrative/tests/job_test_constants.py b/src/biokbase/narrative/tests/job_test_constants.py index 597c43eec7..cfd6ab270f 100644 --- a/src/biokbase/narrative/tests/job_test_constants.py +++ b/src/biokbase/narrative/tests/job_test_constants.py @@ -32,6 +32,13 @@ def generate_error(job_id, err_type): return error_strings[err_type] +def trim_ee2_state(ee2_state, exclude_fields): + if exclude_fields: + for field in exclude_fields: + if field in ee2_state: + del ee2_state[field] + + def get_test_job(job_id): return copy.deepcopy(TEST_JOBS[job_id]) @@ -41,7 +48,7 @@ def get_test_jobs(job_ids): CLIENTS = "biokbase.narrative.clients.get" -TIME_NS = "biokbase.narrative.jobs.jobcomm.time.time_ns" +JC_TIME_NS = "biokbase.narrative.jobs.jobcomm.time_ns" TEST_EPOCH_NS = 42 # arbitrary epoch ns MAX_LOG_LINES = 10 diff --git a/src/biokbase/narrative/tests/narrative_mock/mockclients.py b/src/biokbase/narrative/tests/narrative_mock/mockclients.py index 019d79308b..c81311f76a 100644 --- a/src/biokbase/narrative/tests/narrative_mock/mockclients.py +++ b/src/biokbase/narrative/tests/narrative_mock/mockclients.py @@ -17,6 +17,7 @@ READS_OBJ_1, READS_OBJ_2, generate_error, + trim_ee2_state, ) from biokbase.narrative.tests.generate_test_results import RETRIED_JOBS @@ -244,7 +245,13 @@ def list_objects(self, params): # ----- Execution Engine (EE2) functions ----- def check_workspace_jobs(self, params): - return self.job_state_data + ee2_states = self.job_state_data + if params.get("exclude_fields"): + for ee2_state in ee2_states.values(): + trim_ee2_state(ee2_state, params["exclude_fields"]) + if params.get("return_list"): + ee2_states = list(ee2_states.values()) + return ee2_states def run_job(self, params): return self.test_job_id diff --git a/src/biokbase/narrative/tests/test_job.py b/src/biokbase/narrative/tests/test_job.py index 7f8bf5dc3f..966148f7a1 100644 --- a/src/biokbase/narrative/tests/test_job.py +++ b/src/biokbase/narrative/tests/test_job.py @@ -11,6 +11,7 @@ from biokbase.narrative.jobs.job import ( COMPLETED_STATUS, EXCLUDED_JOB_STATE_FIELDS, + OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS, JOB_ATTR_DEFAULTS, JOB_ATTRS, Job, @@ -33,6 +34,7 @@ MAX_LOG_LINES, TERMINAL_JOBS, get_test_job, + trim_ee2_state, ) from .narrative_mock.mockclients import ( @@ -74,7 +76,7 @@ def create_job_from_ee2(job_id, extra_data=None, children=None): def create_state_from_ee2(job_id, exclude_fields=JOB_INIT_EXCLUDED_JOB_STATE_FIELDS): """ - create the output of job.state() from raw job data + create the output of job.refresh_state() from raw job data """ state = get_test_job(job_id) @@ -178,7 +180,7 @@ def check_jobs_equal(self, jobl, jobr): self.assertEqual(jobl._acc_state, jobr._acc_state) with mock.patch(CLIENTS, get_mock_client): - self.assertEqual(jobl.state(), jobr.state()) + self.assertEqual(jobl.refresh_state(), jobr.refresh_state()) for attr in JOB_ATTRS: self.assertEqual(getattr(jobl, attr), getattr(jobr, attr)) @@ -201,7 +203,7 @@ def check_job_attrs(self, job, job_id, exp_attrs=None, skip_state=False): if not exp_attrs and not skip_state: state = create_state_from_ee2(job_id) with mock.patch(CLIENTS, get_mock_client): - self.assertEqual(state, job.state()) + self.assertEqual(state, job.refresh_state()) attrs = create_attrs_from_ee2(job_id) attrs.update(exp_attrs) @@ -322,7 +324,7 @@ def test_state__non_terminal(self): # ee2_state is fully populated (includes job_input, no job_output) job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) - state = job.state() + state = job.refresh_state() self.assertFalse(job.was_terminal()) self.assertEqual(state["status"], "created") @@ -338,7 +340,7 @@ def test_state__terminal(self): expected = create_state_from_ee2(JOB_COMPLETED) with assert_obj_method_called(MockClients, "check_job", call_status=False): - state = job.state() + state = job.refresh_state() self.assertEqual(state["status"], "completed") self.assertEqual(state, expected) @@ -350,7 +352,7 @@ def test_state__raise_exception(self): job = create_job_from_ee2(JOB_CREATED) self.assertFalse(job.was_terminal()) with self.assertRaisesRegex(ServerError, "check_job failed"): - job.state() + job.refresh_state() def test_state__returns_none(self): def mock_state(self, state=None): @@ -373,7 +375,7 @@ def mock_state(self, state=None): "created": 0, } - with mock.patch.object(Job, "state", mock_state): + with mock.patch.object(Job, "refresh_state", mock_state): state = job.output_state() self.assertEqual(expected, state) @@ -387,10 +389,10 @@ def test_job_update__no_state(self): # should fail with error 'state must be a dict' with self.assertRaisesRegex(TypeError, "state must be a dict"): - job._update_state(None) + job.update_state(None) self.assertFalse(job.was_terminal()) - job._update_state({}) + job.update_state({}) self.assertFalse(job.was_terminal()) @mock.patch(CLIENTS, get_mock_client) @@ -400,11 +402,73 @@ def test_job_update__invalid_job_id(self): """ job = create_job_from_ee2(JOB_RUNNING) expected = create_state_from_ee2(JOB_RUNNING) - self.assertEqual(job.state(), expected) + self.assertEqual(job.refresh_state(), expected) # try to update it with the job state from a different job - with self.assertRaisesRegex(ValueError, "Job ID mismatch in _update_state"): - job._update_state(get_test_job(JOB_COMPLETED)) + with self.assertRaisesRegex(ValueError, "Job ID mismatch in update_state"): + job.update_state(get_test_job(JOB_COMPLETED)) + + @mock.patch(CLIENTS, get_mock_client) + def test_job_update__last_updated__no_change(self): + for job_id, job in get_all_jobs().items(): + + last_updated = job.last_updated + + # job has full ee2 state + ee2_state = get_test_job(job_id) + job._acc_state = get_test_job(job_id) + + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, EXCLUDED_JOB_STATE_FIELDS) + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS) + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + job.update_state({}) + self.assertEqual(last_updated, job.last_updated) + + # job has init ee2 state + ee2_state = get_test_job(job_id) + job._acc_state = get_test_job(job_id) + trim_ee2_state(ee2_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + trim_ee2_state(job._acc_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, EXCLUDED_JOB_STATE_FIELDS) + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + trim_ee2_state(ee2_state, OUTPUT_STATE_EXCLUDED_JOB_STATE_FIELDS) + job.update_state(ee2_state) + self.assertEqual(last_updated, job.last_updated) + + job.update_state({}) + self.assertEqual(last_updated, job.last_updated) + + @mock.patch(CLIENTS, get_mock_client) + def test_job_update__last_updated__change(self): + for job_id, job in get_all_jobs().items(): + + last_updated = job.last_updated + + # job has init ee2 state + job._acc_state = get_test_job(job_id) + trim_ee2_state(job._acc_state, JOB_INIT_EXCLUDED_JOB_STATE_FIELDS) + + ee2_state = get_test_job(job_id) + job.update_state(ee2_state) + self.assertTrue(last_updated < job.last_updated) @mock.patch(CLIENTS, get_mock_client) def test_job_info(self): @@ -559,7 +623,7 @@ def test_parent_children__ok(self): mock.Mock(return_value={"status": COMPLETED_STATUS}), ): for child_job in child_jobs: - child_job.state(force_refresh=True) + child_job.refresh_state(force_refresh=True) self.assertTrue(parent_job.was_terminal()) @@ -726,7 +790,7 @@ def mock_check_job(self_, params): with mock.patch.object(MockClients, "check_job", mock_check_job): for job in child_jobs: - job.state(force_refresh=True) + job.refresh_state(force_refresh=True) self.assertTrue(batch_job.was_terminal()) @@ -757,7 +821,7 @@ def test_in_cells__batch__same_cell(self): batch_job, child_jobs = batch_fam[0], batch_fam[1:] for job in child_jobs: - job.cell_id = "hello" + job._acc_state["job_input"]["narrative_cell_info"]["cell_id"] = "hello" self.assertTrue(batch_job.in_cells(["hi", "hello"])) @@ -769,7 +833,7 @@ def test_in_cells__batch__diff_cells(self): children_cell_ids = ["hi", "hello", "greetings"] for job, cell_id in zip(child_jobs, itertools.cycle(children_cell_ids)): - job.cell_id = cell_id + job._acc_state["job_input"]["narrative_cell_info"]["cell_id"] = cell_id for cell_id in children_cell_ids: self.assertTrue(batch_job.in_cells([cell_id])) diff --git a/src/biokbase/narrative/tests/test_jobcomm.py b/src/biokbase/narrative/tests/test_jobcomm.py index c526e1bebf..5c00181935 100644 --- a/src/biokbase/narrative/tests/test_jobcomm.py +++ b/src/biokbase/narrative/tests/test_jobcomm.py @@ -2,6 +2,7 @@ import itertools import os import re +import sys import unittest from unittest import mock @@ -24,9 +25,12 @@ from biokbase.narrative.jobs.jobmanager import ( JOB_NOT_BATCH_ERR, JOB_NOT_REG_ERR, + JOB_NOT_REG_2_ERR, JOBS_MISSING_ERR, + NO_UPDATED_JOBS_ERR, JobManager, ) +from biokbase.narrative.jobs.util import time_ns from biokbase.narrative.tests.generate_test_results import ( ALL_RESPONSE_DATA, JOBS_BY_CELL_ID, @@ -56,7 +60,12 @@ JOB_TERMINATED, MAX_LOG_LINES, REFRESH_STATE, + TEST_EPOCH_NS, + JC_TIME_NS, generate_error, + get_test_job, + get_test_jobs, + trim_ee2_state, ) from .narrative_mock.mockclients import ( @@ -98,6 +107,14 @@ LOG_LINES = [{"is_error": 0, "line": f"This is line {i}"} for i in range(MAX_LOG_LINES)] +def ts_are_close(t0: int, t1: int) -> bool: + """ + t0 and t1 are epochs in nanoseconds. + Check that they are within 1s of each other + """ + return abs(t1 - t0) * 1e-9 <= 1 + + def make_comm_msg( msg_type: str, job_id_like, as_job_request: bool, content: dict = None ): @@ -485,7 +502,22 @@ def test_start_job_status_loop__no_jobs_stop_loop(self): # Lookup all job states # --------------------- + def _check_pop_last_checked(self, output_states, last_checked=TEST_EPOCH_NS): + """ + For STATUS responses, each output_state will have an extra field `last_checked` + that is variable and is not in the test data. Check that here and delete before + other checks + """ + for output_state in output_states.values(): + self.assertIn("last_checked", output_state) + self.assertTrue( + last_checked == output_state["last_checked"] + or ts_are_close(last_checked, output_state["last_checked"]) + ) + del output_state["last_checked"] + @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def check_job_output_states( self, output_states=None, @@ -494,6 +526,7 @@ def check_job_output_states( response_type=STATUS, ok_states=None, error_states=None, + last_checked=TEST_EPOCH_NS, ): """ Handle any request that returns a dictionary of job state objects; this @@ -506,6 +539,7 @@ def check_job_output_states( :param params: params for the comm message (opt) :param ok_states: list of job IDs expected to be in the output :param error_states: list of job IDs expected to return a not found error + :param last_checked: ts in ns """ if not params: params = {} @@ -527,6 +561,11 @@ def check_job_output_states( msg, ) + # for STATUS responses, there will be a field `last_checked` + # that is variable and not in the test data. check that here and remove + if response_type == MESSAGE_TYPE["STATUS"]: + self._check_pop_last_checked(output_states, last_checked) + for job_id, state in output_states.items(): self.assertEqual(ALL_RESPONSE_DATA[STATUS][job_id], state) if job_id in ok_states: @@ -544,12 +583,19 @@ def test_get_all_job_states__ok(self): # ----------------------- # Lookup single job state # ----------------------- + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_state__1_ok(self): output_states = self.jc.get_job_state(JOB_COMPLETED) self.check_job_output_states( output_states=output_states, ok_states=[JOB_COMPLETED] ) + def test_get_job_state__live_ts(self): + output_states = self.jc.get_job_state(JOB_COMPLETED) + self.check_job_output_states( + output_states=output_states, ok_states=[JOB_COMPLETED], last_checked=time_ns() + ) + def test_get_job_state__no_job(self): with self.assertRaisesRegex( JobRequestException, re.escape(f"{JOBS_MISSING_ERR}: {[None]}") @@ -612,6 +658,7 @@ def test_get_job_states__batch_id__not_batch(self): self.check_batch_id__not_batch_test(STATUS) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_get_job_states__job_id_list__ee2_error(self): exc = Exception("Test exception") exc_message = str(exc) @@ -626,6 +673,8 @@ def mock_check_jobs(params): self.jc._handle_comm_message(req_dict) msg = self.jc._comm.last_message + self._check_pop_last_checked(msg["content"], TEST_EPOCH_NS) + expected = {job_id: copy.deepcopy(ALL_RESPONSE_DATA[STATUS][job_id]) for job_id in ALL_JOBS} for job_id in ACTIVE_JOBS: # add in the ee2_error message @@ -639,6 +688,190 @@ def mock_check_jobs(params): msg, ) + def _reset_last_updated(self): + """Set each last_updated back 3min""" + for job_id in self.jm._running_jobs: + job = self.jm.get_job(job_id) + job.last_updated -= 180 * 1e9 + self.assertTrue(job.last_updated > 0) # sanity check + + def _check_last_updated(self, exp_updated_ids): + """Make sure the right jobs had `last_updated` bumped""" + exp_not_updated_ids = list(set(ALL_JOBS) - set(exp_updated_ids)) # exclusion + now = time_ns() + + exp_updated = [ + self.jm.get_job(job_id).last_updated for job_id in exp_updated_ids + ] + for ts in exp_updated: + self.assertTrue(ts_are_close(ts, now)) + # should all be the same + if exp_updated: + self.assertEqual( + len(set(exp_updated)), + 1, + list(zip(exp_updated_ids, exp_updated)) + ) + + exp_not_updated = [ + self.jm.get_job(job_id).last_updated for job_id in exp_not_updated_ids + ] + for ts in exp_not_updated: + # was at least 3min ago + # (i.e., from self._reset_last_updated) + self.assertTrue(ts < now - 180 * 1e9) + + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__by_last_updated(self): + self._reset_last_updated() + + # what FE will say was the last time the jobs were checked + ts = time_ns() + + # mix of terminal and not terminal + not_updated_ids = [JOB_COMPLETED, JOB_ERROR, JOB_TERMINATED, JOB_CREATED, JOB_RUNNING] + # not terminal + updated_ids = [BATCH_PARENT, BATCH_RETRY_RUNNING] + + # error ids + not_found_ids = [JOB_NOT_FOUND] + + job_ids = not_updated_ids + updated_ids + active_ids = list(set(job_ids) & set(ACTIVE_JOBS)) + + # all job IDs partitioned as + not_found_ids + terminal_ids = list(set(job_ids) - set(ACTIVE_JOBS)) # noqa: F841 + not_updated_active_ids = list(set(not_updated_ids) & set(active_ids)) # noqa: F841 + updated_active_ids = list(set(updated_ids) & set(active_ids)) + + def mock_check_job(self_, params): + """Mutate only chosen job states""" + lookup_id = params["job_id"] + + job_state = get_test_job(lookup_id) + trim_ee2_state(job_state, params.get("exclude_fields")) + if lookup_id in updated_active_ids: + job_state["created"] += 1 + + return job_state + + def mock_check_jobs(self_, params): + """Mutate only chosen job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(active_ids, lookup_ids) # sanity check + + job_states = get_test_jobs(lookup_ids) + for job_id, job_state in job_states.items(): + trim_ee2_state(job_state, params.get("exclude_fields")) + # if job is chosen to be updated, mutate it + if job_id in updated_active_ids: + job_state["created"] += 1 + return job_states + + rq = make_comm_msg(STATUS, job_ids + not_found_ids, False, {"ts": ts}) + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + with mock.patch.object(MockClients, "check_job", mock_check_job): + output_states = self.jc._handle_comm_message(rq) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in updated_active_ids + } + for job_state in expected.values(): + job_state["jobState"]["created"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND + } + + self._check_pop_last_checked(output_states, time_ns()) + self.assertEqual( + expected, + output_states + ) + self._check_last_updated(updated_active_ids) + + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__all_updated_jobs(self): + """ + If theoretically all the jobs were last checked at the beginning of time, + all job states would be returned + """ + self._reset_last_updated() + + def mock_check_job(self_, params): + """Mutate all given job states""" + lookup_id = params["job_id"] + + job_state = get_test_job(lookup_id) + trim_ee2_state(job_state, params.get("exclude_fields")) + job_state["created"] += 1 + + return job_state + + def mock_check_jobs(self_, params): + """Mutate all given job states""" + lookup_ids = params["job_ids"] + self.assertCountEqual(ACTIVE_JOBS, lookup_ids) # sanity check + + job_states = get_test_jobs(lookup_ids) + for _, job_state in job_states.items(): + trim_ee2_state(job_state, params.get("exclude_fields")) + job_state["created"] += 1 + return job_states + + rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": 0}) + with mock.patch.object(MockClients, "check_jobs", mock_check_jobs): + with mock.patch.object(MockClients, "check_job", mock_check_job): + output_states = self.jc._handle_comm_message(rq) + + expected = { + job_id: copy.deepcopy(ALL_RESPONSE_DATA[MESSAGE_TYPE["STATUS"]][job_id]) + for job_id in ALL_JOBS + } + for job_id, job_state in expected.items(): + if job_id in ACTIVE_JOBS: + job_state["jobState"]["created"] += 1 + expected[JOB_NOT_FOUND] = { + "job_id": JOB_NOT_FOUND, + "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND + } + + self._check_pop_last_checked(output_states, time_ns()) + self.assertEqual( + expected, + output_states + ) + self._check_last_updated(ACTIVE_JOBS) + + @mock.patch(CLIENTS, get_mock_client) + def test_get_job_states__no_updated_jobs(self): + """ + If theoretically all the jobs were last checked at the end of time, + no job states would be returned, and there would be an error state + to indicate that + """ + self._reset_last_updated() + + rq = make_comm_msg(STATUS, ALL_JOBS + [JOB_NOT_FOUND], False, {"ts": sys.maxsize}) + output_states = self.jc._handle_comm_message(rq) + + self._check_pop_last_checked(output_states, time_ns()) + self.assertEqual( + { + JOB_NOT_FOUND: { + "job_id": JOB_NOT_FOUND, + "error": JOB_NOT_REG_2_ERR % JOB_NOT_FOUND + }, + "error": { + "error": NO_UPDATED_JOBS_ERR + } + }, + output_states + ) + self._check_last_updated([]) + # ----------------------- # get cell job states # ----------------------- @@ -840,12 +1073,15 @@ def test_cancel_jobs__job_id_list__all_bad_jobs(self): ) @mock.patch(CLIENTS, get_mock_client) + @mock.patch(JC_TIME_NS, lambda: TEST_EPOCH_NS) def test_cancel_jobs__job_id_list__failure(self): # the mock client will throw an error with BATCH_RETRY_RUNNING job_id_list = [JOB_RUNNING, BATCH_RETRY_RUNNING] req_dict = make_comm_msg(CANCEL, job_id_list, False) output = self.jc._handle_comm_message(req_dict) + self._check_pop_last_checked(output) + expected = { JOB_RUNNING: ALL_RESPONSE_DATA[STATUS][JOB_RUNNING], BATCH_RETRY_RUNNING: { diff --git a/src/biokbase/narrative/tests/test_jobmanager.py b/src/biokbase/narrative/tests/test_jobmanager.py index dc2c36bb05..82550c603a 100644 --- a/src/biokbase/narrative/tests/test_jobmanager.py +++ b/src/biokbase/narrative/tests/test_jobmanager.py @@ -736,7 +736,7 @@ def test_update_batch_job__change(self): new_child_ids = BATCH_CHILDREN[1:] + [JOB_CREATED, JOB_NOT_FOUND] def mock_check_job(params): - """Called from job.state()""" + """Called from job.refresh_state()""" job_id = params["job_id"] if job_id == BATCH_PARENT: return {"child_jobs": new_child_ids}