Skip to content

Galaxy loses track of Pulsar jobs #17787

@natefoo

Description

@natefoo

This is a semi-common occurrence to the point where I have a script I run against Pulsar endpoints that does the following:

  1. Check for non-terminal jobs in the DB
  2. Check whether Pulsar knows about those jobs (jobid file exists in its active-jobs/preprocessing-jobs dirs)
  3. If jobs exist in 1 but not 2, remove the job dir on Pulsar and reset the job state to new for reexecution

This is an unfortunate resolution because in most cases the job finished properly on the Pulsar side, Galaxy just failed to update the state, and so you are rerunning the entire execution when really you just want to rerun (at most) the postprocessing or possibly just the final status update.

There may be many causes but I've finally got at least one concrete example. In this case I've had multiple jobs fail over different time periods with a database timeout in the AMQP consumer thread. I am not sure what causes this timeout - nothing incriminating in DB server logs/stats or Galaxy host logs/stats other than increased load and network consumption on the Galaxy server during this period - but nothing drastic (load at 3 instead of 1, network at 100 MB/s instead of 20 MB/s).

The message is received and ack'd:

pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,633 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Acknowledging UUID af159766-e58f-11ee-a906-fa163ed650e8 on queue status_update_ack
pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,633 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] [publish:af2121ee-e58f-11ee-9d76-005056bc743e] Begin publishing to key pulsar_vgp_jetstream2__status_update_ack
pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,635 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] [publish:af2121ee-e58f-11ee-9d76-005056bc743e] Have producer for publishing to key pulsar_vgp_jetstream2__status_update_ack
pulsar.client.amqp_exchange DEBUG 2024-03-18 20:26:17,635 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] [publish:af2121ee-e58f-11ee-9d76-005056bc743e] Published to key pulsar_vgp_jetstream2__status_update_ack
pulsar.client.manager DEBUG 2024-03-18 20:26:17,636 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Handling asynchronous status update from remote Pulsar.

The AMQP heartbeat thread dies 6 minutes later:

pulsar.client.amqp_exchange ERROR 2024-03-18 20:36:18,112 [pN:main_vgp_handler0,p:3997641,tN:consume-heartbeat-pulsar_vgp_jetstream2__status_update] Problem with heartbeat, leaving heartbeat method in problematic state!
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/pulsar/client/amqp_exchange.py", line 208, in heartbeat
    connection.heartbeat_check()
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/connection.py", line 328, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 222, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/amqp/connection.py", line 776, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed
Exception in thread consume-heartbeat-pulsar_vgp_jetstream2__status_update:
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/deps/_conda/envs/[email protected]/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 72, in run
    reraise(*_capture_exception())
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sentry_sdk/_compat.py", line 115, in reraise
    raise value
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sentry_sdk/integrations/threading.py", line 70, in run
    return old_run_func(self, *a, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/deps/_conda/envs/[email protected]/lib/python3.11/threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/pulsar/client/amqp_exchange.py", line 208, in heartbeat
    connection.heartbeat_check()
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/connection.py", line 328, in heartbeat_check
    return self.transport.heartbeat_check(self.connection, rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 222, in heartbeat_check
    return connection.heartbeat_tick(rate=rate)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/amqp/connection.py", line 776, in heartbeat_tick
    raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed

This is followed another 6 minutes later by the DB timeout:

amqp.exceptions.ConnectionForced: Too many heartbeats missed
galaxy.jobs.runners.pulsar ERROR 2024-03-18 20:42:10,040 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Failed to update Pulsar job status for job_id (56400785/56400785)
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: could not receive data from server: Connection timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/cvmfs/main.galaxyproject.org/galaxy/lib/galaxy/jobs/runners/pulsar.py", line 984, in __async_update
    job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/galaxy/lib/galaxy/jobs/handler.py", line 289, in job_pair_for_id
    job = self.sa_session.query(model.Job).get(id)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in get
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/util/deprecations.py", line 468, in warned
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 947, in get
    return self._get_impl(ident, loading.load_on_pk_identity)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/query.py", line 951, in _get_impl
    return self.session._get_impl(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2975, in _get_impl
    return db_load_fn(
           ^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1717, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
    self._handle_dbapi_exception(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
    util.raise_(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
    self.dialect.do_execute(
  File "/cvmfs/main.galaxyproject.org/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) could not receive data from server: Connection timed out
[SQL: SELECT job.id AS job_id, job.create_time AS job_create_time, job.update_time AS job_update_time, job.history_id AS job_history_id, job.library_folder_id AS job_library_folder_id, job.tool_id AS job_tool_id, job.tool_version AS job_tool_version, job.galaxy_version AS job_galaxy_version, job.dynamic_tool_id AS job_dynamic_tool_id, job.state AS job_s>
FROM history_dataset_collection_association, job_to_output_dataset_collection
WHERE job.id = job_to_output_dataset_collection.job_id AND history_dataset_collection_association.id = job_to_output_dataset_collection.dataset_collection_id AND history_dataset_collection_association.deleted = true) AS anon_1, EXISTS (SELECT history_dataset_association.id
FROM history_dataset_association, job_to_output_dataset
WHERE job.id = job_to_output_dataset.job_id AND history_dataset_association.id = job_to_output_dataset.dataset_id AND history_dataset_association.deleted = true) AS anon_2
FROM job
WHERE job.id = %(pk_1)s]
[parameters: {'pk_1': '56400785'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
pulsar.client.manager ERROR 2024-03-18 20:42:10,102 [pN:main_vgp_handler0,p:3997641,tN:pulsar_client_vgp_jetstream2_status_update_callback] Failure processing job status update message.

Obviously ideally I'll fix whatever is causing the timeout, but also it'd be great if we didn't lose track of the one-and-only terminal status update. Two thoughts:

  1. Originally we (I?) chose to ack the status update before running the callback since the callback could block the consumer thread for an extended amount of time, but this is maybe against the spirit of the ack in the first place, since it has not actually "handled" the message at that point other than to decode it.
  2. [WIP] Allow occasionally polling of potentially lost Pulsar MQ jobs #9911 would be a solution to this.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions