Skip to content

task-aware celery worker autoscaling (+ pod-deletion-cost) #339

@thesuperzapper

Description

@thesuperzapper

The chart currently supports primitive autoscaling for celery workers, using HorizontalPodAutoscalers with memory metrics. But this is very flawed, as there is not necessarily a link between RAM usage, and the number of pending tasks, meaning you could have a situation where your workers don't scale up despite having pending tasks.

We can make a task-aware autoscaler that will scale up the number of celery workers when there are not enough task slots, and scale down when there are too many.

In past, scale down was dangerous to use with airflow workers, as Kubernetes had no way to influence which Pods were removed, meaning Kubernetes often removes a busy worker where there are workers that are doing nothing.

As of Kubernetes 1.22, there is a beta annotation for Pods managed by ReplicaSets called controller.kubernetes.io/pod-deletion-cost, which tells Kubernetes how "expensive" killing a particular Pod is when decreasing the replicas count.

NOTE: Previously we considered using KEDA (#103) to manage autoscaling, but this will not work with controller.kubernetes.io/pod-deletion-cost, as the HorizontalPodAutoscaler created by KEDA can not patch the required annotations BEFORE scaling down.


Our Celery Worker Autoscaler can perform the following loop:

  1. Cleanup from any past loops:
    1. Remove any controller.kubernetes.io/pod-deletion-cost annotations
      • NOTE: there will only be dangling annotations if Kubernetes did not remove our "chosen" Pods, or if the autoscaler crashed halfway through a loop
      • NOTE: we need to attempt to prevent multiple instances of our autoscaler running at a time
    2. Send each worker Pod that we removed an annotation from an app.control.add_consumer() command, so it resumes picking up new airflow tasks
  2. Calculate the ideal number of worker replicas for the current task load:
    • if the load factor of workers is above A for B time --> increase replicas to meet the target load factor
    • if the load factor of workers is below X for Y time --> decrease replicas to meet the target load factor
      • NOTE: the load factor is the number of available task slots which are consumed
      • NOTE: we should put some limit on the number of scaling decisions per A seconds (to prevent a yo-yo effect), (perhaps have separate limits for down and up to allow faster upscaling)
      • NOTE: we should have a "scaling algorithm" config, even if we only start with 1
      • NOTE: we should have minium and maximum replicas configs
      • NOTE: if using CeleryKubernetesExecutor, we must exclude tasks that are in the AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE
  3. If replicas are going to be decreased by N:
    1. Sort the worker pods by their pod-deletion-cost in ascending order
      • NOTE: the pod-deletion-cost is the number of running tasks, weighted by the total running time of each task (so long-running tasks are not needlessly evicted), specifically we want smaller numbers of long-running tasks to be weighted higher than larger numbers of short-running tasks
      • NOTE: add a DAG/Task label which will prevent any worker running it from being killed (or allow a "weighting" per Task)
    2. Annotate the N worker Pods with the lowest cost Pods with the controller.kubernetes.io/pod-deletion-cost annotation
      • NOTE: if there are pods in a Pending/Unready state, we can reduce N by this number, as Kubernetes will remove these pods first
    3. Send each worker Pod that was annotated an app.control.cancel_consumer(...) command, so does not pick up new airflow tasks after being "marked" for deletion
    4. Patch the replicas down by N

Important changes to make this work:

  • We will need to use a Deployment for the workers(rather than a StatefulSet), as controller.kubernetes.io/pod-deletion-cost is only for Pods in ReplicaSets
  • Because controller.kubernetes.io/pod-deletion-cost is alpha in 1.21 and beta in 1.22, for older Kubernetes versions we can let users use the CloneSet from the CNCF project called OpenKruise (instead of Deployment), as they have back-ported the controller.kubernetes.io/pod-deletion-cost annotation.

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions