Background:

Kubernetes is a fast growing open-source platform which provides container-centric infrastructure. Conceived by Google in 2014, and leveraging over a decade of experience running containers at scale internally, it is one of the fastest moving projects on GitHub with 1000+ contributors and 40,000+ commits. Kubernetes has first class support on Google Cloud Platform, Amazon Web Services, and Microsoft Azure. Kubernetes is an open-source platform designed to automate deploying, scaling, and operating application containers, and is widely used by organizations across the world for a variety of large-scale solutions including serving, stateful applications, and increasingly - data science and ETL workloads.

While traditional environments like YARN-based hadoop clusters have used Oozie, newer data and ML pipelines built on Kubernetes are increasingly using Airflow for orchestrating and scheduling DAGs. Adding native Kubernetes support into Airflow would increase the viable use cases for airflow, add a mature and well understood workflow scheduler to the Kubernetes ecosystem, and create possibilities for improved security and robustness within airflow in the future.

Kubernetes Executor:

Kubernetes Api:

We will communicate with Kubernetes using the Kubernetes python client. This client will allow us to create, monitor, and kill jobs. Users will be required to either run their airflow instances within the kubernetes cluster, or provide an address to link the API to the cluster.  

Launching Jobs:

Unlike the current MesosExecutor, which uses pickle to serialize DAGs and send them to pre-built slaves, the KubernetesExecutor will launch a new temporary worker job for each task. Each job will have contain a full airflow deployment and will run an airflow run <dag_id> <task_id> command. This design has two major benefits over the previous system. The first benefit is that dynamically creating airflow workers simplifies the cluster set-up. Users will not need to pre-build airflow workers or consider how the nodes will communicate with each other. The second benefit is that dynamically creating pods allows for a highly elastic system that can easily scale to large workloads while not wasting resources during periods of low usage.

Monitoring jobs:

We will watch jobs using the Kubernetes Watch API. This API will allow us to passively watch all events on a namespace, filtered by label. We can contain the watchers on separate threads which can use event handling to handle failures from airflow pods.


Sharing Dags:

To encourage a wide array of potential storage options for airflow users, we will take advantage of Kubernetes Persistent Volumes. The PV/PVC abstractions allow Kubernetes to encapsulate a wide variety of distributed storage options. We will offer two modes for DAG storage: git-mode and persistent volume mode. Git mode is the least scalable, yet easiest to setup DAG storage system. This system will simply pull your DAGS from github in an init container for usage by the airflow pod. This case is primarily recommended for development/testing, yet will still work for small production cases. The persistent volume mode, on the other hand, takes advantage of an existing kubernetes structure called a “persistent volume.” This API will allow users to treat external systems like S3, NFS, and cinder as if they were directories in the local file system. This system is recommended for larger DAG folders in production settings.


Security:

Kubernetes offers multiple inherent security benefits that would allow airflow users to safely run their jobs with minimal risk. By running airflow instances in non-default namespaces, administrators can populate those namespaces with only the secrets required to access data that is allowed for a user or role-account. We could also further restrict access using airflows' multi-tenancy abilities and kerberos integration.

Kubernetes Operator


Generating kubernetes pods require a fair amount of unavoidable configuration. To minimize this complexity to the user while still allowing for a high amount of flexibility we have created a KubernetesPodOperatorFactory class. This factory class will prevent anti-patterns like forcing the user to create classes with more than 5 starting parameters or depend on kw-arguments.


class KubernetesPodOperatorFactory:
    def __init__(
        self,
		trigger_dag_id,
        image,
        cmds
    ):
    def add_env_variables(self, env):
    def add_secrets(self, secrets):
    def add_labels(self, labels):
    def add_name(self, name):
    def set_namespace(self, namespace):
	def set_upstream(self, operator)
    def generate(self):

Questions posed by the airflow team:

What tools would we provide to allow users to launch their own docker images/clusters?:

  • We intend to build two new operators for dealing with internal docker images and kubernetes pods. A KubernetesOperator and a HelmOperator. Using the KubernetesOperator the user will either point to a yaml file in the $AIRFLOW_HOME/yaml folder. While we believe this capability should be placed into a separate parallel PR (since it is a different feature), merging the KubernetesExecutor will be a powerful first step towards making kubernetes a first class citizen in airflow.

How will the scheduler parse DAGs that have external dependencies (i.e. DAGS that require third party imports)? Currently our plan is to restrict dependencies.

  • We agree that complexity should be kept away from the actual DAG creation/parsing. For further complexity, users will be able to use the KubernetesOperator to customize within their own sandboxes. Using unknown add-ons could also have an adverse affect on airflow stability and security.

If we want to use Kubernetes properly, then there won't be special resources on the hosts that are shared (e.g. can run airflow alongside other types of kubernetes pods). The problem with this is the whole DAG folder needs to be fetched on every worker which could cause a lot of load and increase task latency time.

  • With this set up, each worker only retrieves/parses the parent DAG its assigned task. This would minimize the network overhead to maintain the system.

Docker image deployment/rollbacks (e.g. if upgrading your airflow docker image, how to handle long-running tasks, wait for them to finish/time them out and then restart them using the new docker image? Airflow would need to support retries that don't count as failures in this case)

  • Users could handle new roll-outs by implementing a separate airflow pod, setting all not-currently-running jobs to only run on the replacement pod, and destroying the old deployment when all jobs are finished running.

Task logging, right now logs are stored locally/in s3 but can't fetch local logs from kubernetes (our intern is working on making this better)

  • AirBnb currently has an airflow team-member working on ELK integration for airflow-kubernetes.

If an airflow worker fails it might be useful to keep the kubernetes worker reserved and preserved in it's same state for debugging purposes

  • Kubernetes pods can retain state and logs, and we can use etcd to preserve some additional state as well (through a CRD in future).  

Other interesting points:
The Airflow Kubernetes executor should try to respect the resources that are set in tasks for
  scheduling when hitting the kubernetes API

Future work

Spark-On-K8s integration:

Teams at Google, Palantir, and many others are currently nearing release for a beta for spark that would run natively on kubernetes. This application would allow users to submit spark-submit commands to a resource manager that can dynamically spawn spark clusters for data processing. A seperate spark-on-k8s hook can be developed to sit within the SparkSubmitOperator depending on user configurations.

  • No labels

4 Comments

  1. Anirudh Ramanathan WRT to recovery from failure modes, one thought would be storing checkpoints for the watcher in ETCD s.t. if the schedulers need to restart they can pull that data from ETCD and not restart from the beginning of the namespace. Do you think there would be possible issues with that/do you have any suggestions for other architectures?

  2. I was looking into this, and it seems to me like the database component should take care of maintaining state in some failure cases even if the scheduler goes down. In specific, the `jobs` table should contain the states and the recent heartbeats from various tasks. The scheduler should also be capable of reading that back, so, the failure mode of scheduler gone awry might be handled for us by the framework itself in some cases. 

    It is Kubernetes framework failures (pod lost due to node outage for example) that need some special handling. Before attempting to solve that using K8s constructs, I would like to spend some time understanding what happens currently when a worker node goes down in case of CeleryExecutor when a task is running on that worker.

     

  3. Daniel - might be good to have an update section to this doc as well, given much progress has happened since. (smile)

  4. Anirudh Ramanathan I am fairly new to airflow on k8s. I am trying to deploy and I am able to do so now. Here is my question : I am using aws efs as persistentVolume as that supports readwriteMany(for the logs and dags folder). Now for a developer to add new dags once the cluster is up and running  into the persistence volume; do you follow any standard practice? One solution I can find is mounting efs on an ec2 instance and then using scp. But how to do this in production ?  Is there any other best practice people follow. Please let me know. Thanks a lot for your contribution.