Kubernetes is a fast growing open-source platform which provides container-centric infrastructure. Conceived by Google in 2014, and leveraging over a decade of experience running containers at scale internally, it is one of the fastest moving projects on GitHub with 1000+ contributors and 40,000+ commits. Kubernetes has first class support on Google Cloud Platform, Amazon Web Services, and Microsoft Azure. Kubernetes is an open-source platform designed to automate deploying, scaling, and operating application containers, and is widely used by organizations across the world for a variety of large-scale solutions including serving, stateful applications, and increasingly - data science and ETL workloads.
While traditional environments like YARN-based hadoop clusters have used Oozie, newer data and ML pipelines built on Kubernetes are increasingly using Airflow for orchestrating and scheduling DAGs. Adding native Kubernetes support into Airflow would increase the viable use cases for airflow, add a mature and well understood workflow scheduler to the Kubernetes ecosystem, and create possibilities for improved security and robustness within airflow in the future.
We will communicate with Kubernetes using the Kubernetes python client. This client will allow us to create, monitor, and kill jobs. Users will be required to either run their airflow instances within the kubernetes cluster, or provide an address to link the API to the cluster.
Unlike the current MesosExecutor, which uses pickle to serialize DAGs and send them to pre-built slaves, the KubernetesExecutor will launch a new temporary worker job for each task. Each job will have contain a full airflow deployment and will run an airflow run <dag_id> <task_id> command. This design has two major benefits over the previous system. The first benefit is that dynamically creating airflow workers simplifies the cluster set-up. Users will not need to pre-build airflow workers or consider how the nodes will communicate with each other. The second benefit is that dynamically creating pods allows for a highly elastic system that can easily scale to large workloads while not wasting resources during periods of low usage.
We will watch jobs using the Kubernetes Watch API. This API will allow us to passively watch all events on a namespace, filtered by label. We can contain the watchers on separate threads which can use event handling to handle failures from airflow pods.
To encourage a wide array of potential storage options for airflow users, we will take advantage of Kubernetes Persistent Volumes. The PV/PVC abstractions allow Kubernetes to encapsulate a wide variety of distributed storage options. We will offer two modes for DAG storage: git-mode and persistent volume mode. Git mode is the least scalable, yet easiest to setup DAG storage system. This system will simply pull your DAGS from github in an init container for usage by the airflow pod. This case is primarily recommended for development/testing, yet will still work for small production cases. The persistent volume mode, on the other hand, takes advantage of an existing kubernetes structure called a “persistent volume.” This API will allow users to treat external systems like S3, NFS, and cinder as if they were directories in the local file system. This system is recommended for larger DAG folders in production settings.
Kubernetes offers multiple inherent security benefits that would allow airflow users to safely run their jobs with minimal risk. By running airflow instances in non-default namespaces, administrators can populate those namespaces with only the secrets required to access data that is allowed for a user or role-account. We could also further restrict access using airflows' multi-tenancy abilities and kerberos integration.
Generating kubernetes pods require a fair amount of unavoidable configuration. To minimize this complexity to the user while still allowing for a high amount of flexibility we have created a KubernetesPodOperatorFactory class. This factory class will prevent anti-patterns like forcing the user to create classes with more than 5 starting parameters or depend on kw-arguments.
Questions posed by the airflow team:
What tools would we provide to allow users to launch their own docker images/clusters?:
- We intend to build two new operators for dealing with internal docker images and kubernetes pods. A KubernetesOperator and a HelmOperator. Using the KubernetesOperator the user will either point to a yaml file in the $AIRFLOW_HOME/yaml folder. While we believe this capability should be placed into a separate parallel PR (since it is a different feature), merging the KubernetesExecutor will be a powerful first step towards making kubernetes a first class citizen in airflow.
How will the scheduler parse DAGs that have external dependencies (i.e. DAGS that require third party imports)? Currently our plan is to restrict dependencies.
- We agree that complexity should be kept away from the actual DAG creation/parsing. For further complexity, users will be able to use the KubernetesOperator to customize within their own sandboxes. Using unknown add-ons could also have an adverse affect on airflow stability and security.
If we want to use Kubernetes properly, then there won't be special resources on the hosts that are shared (e.g. can run airflow alongside other types of kubernetes pods). The problem with this is the whole DAG folder needs to be fetched on every worker which could cause a lot of load and increase task latency time.
- With this set up, each worker only retrieves/parses the parent DAG its assigned task. This would minimize the network overhead to maintain the system.
Docker image deployment/rollbacks (e.g. if upgrading your airflow docker image, how to handle long-running tasks, wait for them to finish/time them out and then restart them using the new docker image? Airflow would need to support retries that don't count as failures in this case)
- Users could handle new roll-outs by implementing a separate airflow pod, setting all not-currently-running jobs to only run on the replacement pod, and destroying the old deployment when all jobs are finished running.
Task logging, right now logs are stored locally/in s3 but can't fetch local logs from kubernetes (our intern is working on making this better)
- AirBnb currently has an airflow team-member working on ELK integration for airflow-kubernetes.
If an airflow worker fails it might be useful to keep the kubernetes worker reserved and preserved in it's same state for debugging purposes
- Kubernetes pods can retain state and logs, and we can use etcd to preserve some additional state as well (through a CRD in future).
Other interesting points:
The Airflow Kubernetes executor should try to respect the resources that are set in tasks for
scheduling when hitting the kubernetes API
Teams at Google, Palantir, and many others are currently nearing release for a beta for spark that would run natively on kubernetes. This application would allow users to submit spark-submit commands to a resource manager that can dynamically spawn spark clusters for data processing. A seperate spark-on-k8s hook can be developed to sit within the SparkSubmitOperator depending on user configurations.