Apache Airflow: KubernetesExecutor

In an earlier post on Airflow, I shared my experience moving from the MesosExecutor to the DaskExecutor. This post focuses on deploying the Airflow KubernetesExecutor to dynamically launch worker pods in Kubernetes.

Prior to the release of Airflow 1.10.13, documentation to standing up the Airflow KubernetesExecutor was fairly difficult to track down in GitHub. The current architecture diagrams on the Airflow Docs help make the integration architecture easier to follow.

An ideal microservices architecture splits out the airflow-webserver and airflow-schedulerservices. In the Kubernetes world theairflow-webserver and airflow-scheduler containers can exist in separate pods that request different memory and cpu from the Kubernetes Agents.

— KubernetesExecutor

In recent releases, there were some sweeping changes to the KubernetesExecutor implementation where a large chunk of [kubernetes]configs were removed from airflow.cfg. The new direction of using the airflow.kubernetes.pod_template_file is the single most important Kubernetes config to get Airflow’s integration with Kubernetes up and running. Getting the airflow-workerpod spec right is critical, and the Kubernetes pod_template_fileairflow configuration means you can manage the manifest in your favorite yaml templating language. The rendered pod_template_file needs to be mounted onto theairflow-scheduler pod at the path specified by airflow.kubernetes.pod_template_file in airflow.cfg.

— airflow-worker pod_template_file

The pod_template_filespecification accepts a path to a fully defined Kubernetes v1Pod Spec and sends it to the Kubernetes API to generate airflow-workerpods for every Airflow task. The Airflow docs provide guidance on how to craft this specification so that Airflow can override essential fields it at runtime — after all k8s pods require unique pod names. Beyond the skeleton Airflow provides this configuration accepts any v1Pod spec, so a high degree of customization can be achieved.

apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
initContainers:
- name: git-sync
image: "k8s.gcr.io/git-sync/git-sync:v3.3.0"
env:
- name: GIT_SYNC_BRANCH
value: "master"
- name: GIT_SYNC_REPO
value: "https://github.com/bpleines/airflow-dags"
- name: GIT_SYNC_DEST
value: "dags"
- name: GIT_SYNC_ONE_TIME
value: "true"
volumeMounts:
- name: airflow-dags
mountPath: /git
containers:
- args: []
command: []
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
envFrom: []
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
ports: []
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: "/opt/airflow"
name: airflow-dags
readOnly: false
hostNetwork: false
restartPolicy: Never
securityContext:
runAsUser: 50000
nodeSelector:
{}
affinity:
{}
tolerations:
[]
serviceAccountName: 'RELEASE-NAME-worker-serviceaccount'
volumes:
- name: airflow-dags
emptyDir: {}
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-local-settings

— git-sync

Because the airflow-webserver, airflow-scheduler, and airflow-worker are separate pods, each needs a mechanism to continuously update DAG definitions. Traditionally I’ve used continuous integration (ex. Jenkins) webhooks to trigger a git clonejob for the latest DAG definitions from a git repository, but the Airflow docs suggest the kubernetes git-sync container project. git-sync is a great alternative to Jenkins because it assumes the role of the external CI system for DAGs as a deployable set of containers in Kubernetes alongside Airflow.

— git-sync as an initContainer

Airflow worker pods are ephemeral, so a persistent sidecar container doesn’t work, but the git-sync container can also operate as an initContainer. initContainers must complete before the main container starts. Setting the GIT_SYNC_ONE_TIME=true environment variable ensures that the git-sync container exits once the repository synchronization is complete. Once the git-sync initContainer exits, the airflow-worker pod is ready to execute the DAG with a freshly cloned DAG definition.

— git-sync as sidecar container(s)

Equipping the airflow-webserver and airflow-scheduler pods with a git-sync sidecar container allows a shared a volume to populate the dagsdirectory on the Airflow container. These sidecar containers can periodically update the DAG definitions inairflow-webserver and airflow-scheduler containers by refetching DAG code in GIT_SYNC_BRANCH from GIT_SYNC_REPO. GIT_SYNC_WAITdefines the interval that the git-sync sidecar container fetches updated DAG definitions. A crucial edge case to consider is the value used for GIT_SYNC_WAIT with respect to DAG execution schedules. As a contrived example, if a DAG is scheduled to run every 60 seconds but GIT_SYNC_WAIT=540, then there is a chance that the airflow-webserver and airflow-scheduler pods contain outdated DAG definitions compared to what the airflow-workerpod fetched from its git-syncinitContainer. Separately, as a word of caution, the git-sync sidecar container can cause an airflow pod to restart if it cannot clone GIT_SYNC_REPO for whatever reason, so baking in some forgiveness with GIT_SYNC_MAX_SYNC_FAILURES can reduce unwarranted pod restarts.

— minio

Minio can serve as substitute for Amazon S3 deployed in-cluster alongside the Airflow pods. S3 is one of Airflow’s supported mechanisms to facilitate remote logging, which is what allows theairflow-webserverto fetch airflow-worker logs even after an airflow-worker pod completes and is deleted. The use of minio follows the same reasoning as git-sync — it eliminates an external dependency on S3. Minio can also allow DAGs to share files and object data between executions. Airflow’s XComs have byte limitations enforced by the Airflow SQL metadata database, so minio can add a lot of value here.

After closely operating a few executors, the KubernetesExecutor has proven to be the best way to dynamically scale Airflow. Because Kubernetes worker pods are generated on demand airflow jobs have resource isolation. 4 persistent airflow pods is all you need.

NAME                                   READY   STATUSairflow-minio-123456789-12345          1/1     Runningairflow-postgres-123456789-12345       1/1     Runningairflow-scheduler-123456789-12345      2/2     Runningairflow-webserver-123456789-12345      2/2     Running

Open-source automation