Skip to content

Commit 424b5ca

Browse files
feat: add liveness probe for celery workers (#766)
* feat: Add liveness probes for celery workers Currently pods can enter a 'zombie' state where they become disconnected from celery and will no longer pick up new jobs. We create a liveness probe to detect such pods so they can be killed by k8s and recreated Signed-off-by: Nick Wood <[email protected]> * feat: rewrite celery probe with python Signed-off-by: Mathew Wicks <[email protected]> * feat: add new worker liveness probe values to docs Signed-off-by: Mathew Wicks <[email protected]> --------- Signed-off-by: Nick Wood <[email protected]> Signed-off-by: Mathew Wicks <[email protected]> Co-authored-by: Mathew Wicks <[email protected]>
1 parent 773dbf8 commit 424b5ca

File tree

3 files changed

+49
-1
lines changed

3 files changed

+49
-1
lines changed

charts/airflow/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ Parameter | Description | Default
308308
`workers.celery.*` | configs for the celery worker Pods | `<see values.yaml>`
309309
`workers.terminationPeriod` | how many seconds to wait after SIGTERM before SIGKILL of the celery worker | `60`
310310
`workers.logCleanup.*` | configs for the log-cleanup sidecar of the worker Pods | `<see values.yaml>`
311+
`workers.livenessProbe.*` | configs for the worker Pods' liveness probe | `<see values.yaml>`
311312
`workers.extraPipPackages` | extra pip packages to install in the worker Pods | `[]`
312313
`workers.extraVolumeMounts` | extra VolumeMounts for the worker Pods | `[]`
313314
`workers.extraVolumes` | extra Volumes for the worker Pods | `[]`
@@ -333,7 +334,7 @@ Parameter | Description | Default
333334
`triggerer.safeToEvict` | if we add the annotation: "cluster-autoscaler.kubernetes.io/safe-to-evict" = "true" | `true`
334335
`triggerer.podDisruptionBudget.*` | configs for the PodDisruptionBudget of the triggerer Deployment | `<see values.yaml>`
335336
`triggerer.capacity` | maximum number of triggers each triggerer will run at once (sets `AIRFLOW__TRIGGERER__DEFAULT_CAPACITY`) | `1000`
336-
`triggerer.livenessProbe.*` | liveness probe for the triggerer Pods | `<see values.yaml>`
337+
`triggerer.livenessProbe.*` | configs for the triggerer Pods' liveness probe | `<see values.yaml>`
337338
`triggerer.extraPipPackages` | extra pip packages to install in the triggerer Pods | `[]`
338339
`triggerer.extraVolumeMounts` | extra VolumeMounts for the triggerer Pods | `[]`
339340
`triggerer.extraVolumes` | extra Volumes for the triggerer Pods | `[]`

charts/airflow/templates/worker/worker-statefulset.yaml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,44 @@ spec:
150150
time.sleep(10)
151151
active_tasks = i.active()[local_celery_host]
152152
{{- end }}
153+
{{- if .Values.workers.livenessProbe.enabled }}
154+
livenessProbe:
155+
initialDelaySeconds: {{ .Values.workers.livenessProbe.initialDelaySeconds }}
156+
timeoutSeconds: {{ .Values.workers.livenessProbe.timeoutSeconds }}
157+
failureThreshold: {{ .Values.workers.livenessProbe.failureThreshold }}
158+
periodSeconds: {{ .Values.workers.livenessProbe.periodSeconds }}
159+
exec:
160+
command:
161+
{{- include "airflow.command" . | indent 16 }}
162+
- "python"
163+
- "-Wignore"
164+
- "-c"
165+
- |
166+
import os
167+
import sys
168+
import subprocess
169+
from celery import Celery
170+
from celery.app.control import Inspect
171+
from typing import List
172+
173+
def run_command(cmd: List[str]) -> str:
174+
process = subprocess.Popen(cmd, stdout=subprocess.PIPE)
175+
output, error = process.communicate()
176+
if error is not None:
177+
raise Exception(error)
178+
else:
179+
return output.decode(encoding="utf-8")
180+
181+
broker_url = run_command(["bash", "-c", "eval $AIRFLOW__CELERY__BROKER_URL_CMD"])
182+
local_celery_host = f"celery@{os.environ['HOSTNAME']}"
183+
app = Celery(broker=broker_url)
184+
185+
# ping the local celery worker to see if it's ok
186+
i = Inspect(app=app, destination=[local_celery_host], timeout=5.0)
187+
ping_responses = i.ping()
188+
if local_celery_host not in ping_responses:
189+
sys.exit(f"celery worker '{local_celery_host}' did not respond to ping")
190+
{{- end }}
153191
ports:
154192
- name: wlog
155193
containerPort: 8793

charts/airflow/values.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,15 @@ workers:
994994
##
995995
intervalSeconds: 900
996996

997+
## configs for the worker Pods' liveness probe
998+
##
999+
livenessProbe:
1000+
enabled: true
1001+
initialDelaySeconds: 10
1002+
periodSeconds: 30
1003+
timeoutSeconds: 60
1004+
failureThreshold: 5
1005+
9971006
## extra pip packages to install in the worker Pod
9981007
##
9991008
## ____ EXAMPLE _______________

0 commit comments

Comments
 (0)