Motivation
Airflow worker host is a shared resource among all tasks running on it. Thus, it requires hosts to provision dependencies for all tasks, including system and python application level dependencies. It leads to a very fat runtime, thus long host provision time and low elasticity in the worker resource.
The lack of runtime isolation makes it challenging and risky to do operations, including adding/upgrading system and python dependencies, and it is almost impossible to remove any dependencies. It also incurs lots of additional operating costs for the team as users do not have permission to add/upgrade python dependencies, which requires us to coordinate with them. When there are package version conflicts, it prevents installing them directly on the host. Users have to use PythonVirtualenvOperator.
Considerations
To solve those problems, I propose introducing docker runtime for Airflow tasks and dag parsing (It won't be the default runtime). It leverages docker as the tasks runtime environment. There are several benefits:
- Provide runtime isolation on task level
- Customize runtime to parse dag files
- Lean runtime, which enables high worker resource elasticity
- Immutable and portable runtime
- Process isolation ensures that all subprocesses of a task are cleaned up after docker exits
Note: this AIP does NOT force users to adopt docker as the default runtime. It adds an option to parse dag files in docker container and run tasks in docker container.
What change do you propose to make?
Airflow Worker
The current airflow worker runtime is shared with all tasks on a host. An airflow worker is responsible for running an airflow task.
Current process hierarchy is:
airflow worker process
→ `airflow run --local` process
→ `airflow run --raw` process
→ potential more processes spawned by tasks
In the new design, the `airflow run local` and `airflow run raw` processes are running inside a docker container, which is launched by an airflow worker. In this way, the airflow worker runtime only needs minimum requirements to run airflow core and docker.
Airflow Scheduler
Instead of processing the DAG file directly, the DagFileProcessor process launches a docker container required by that DAG file to process it and persists the serializable DAGs (SimpleDags) to the db.
This ensures the DAG parsing runtime is exactly the same as DAG execution runtime.
This requires a DAG definition file to tell the DAG file processing loop to use which docker image to process it. We can easily achieve this by having a metadata file along with the DAG definition file to define the docker runtime . To ease the burden of users, the infra team provide a default docker image when a DAG definition file does not require customized runtime.
Dag metadata yaml file:
This yaml file describes the docker runtime for the dag file. for example, if there is a dag file called /tmp/dag.py
, the metadata yaml file will be /tmp/dag.py.metadata.yaml
Code Block |
---|
docker_runtime: image: airflow-docker-image:0.0.1 volumes: - a:b environments: key: value # and more |
everything defined in this dag.py.metadata.yaml
will override the global docker settings in the next section.
When no dag_file_name.py.metadata.yaml
, the runtime config uses the global settings.
Docker Related Settings:
docker settings for the scheduler and worker can be configured separately in the airflow.cfg but with a common section. for example.
Code Block |
---|
[docker_runtime] CONTAINER_REPOSITORY = CONTAINER_TAG = SOCKET_PATH = unix:///var/run/docker.sock MOUNT_DOCKER_SOCKET = False STOP_TIMEOUT = 20 volume_mount_yaml = <filepath_to_volume_mounts> # since this can be very long and complex, we want it to be a separated file so that it can be managed easily. NETWORK_MODE = OOM_KILL_DISABLE = False REMOVE_CONTAINER_AFTER_EXIT = True PRIVILEGED = False LIMIT_RESOURCE = True ENVIRONMENT_VARIABLES_FROM_HOST = CGROUP_MOUNT_PATH = /sys/fs/cgroup DOCKER_CGROUP_SUBPATH = docker [docker_runtime_environment_variables] AIRFLOW_HOME = PYTHONPATH = [scheduler_dag_parsing_docker_runtime] # this section gets all settings from the docker_runtime and overrides them RUN_AS_USER = airflow LIMIT_RESOURCE = False MOUNT_DOCKER_SOCKET = False [task_execution_docker_runtime] # this section gets all settings from the docker_runtime and overrides them RUN_AS_USER = airflow LIMIT_RESOURCE = True MOUNT_DOCKER_SOCKET = True [task_execution_docker_runtime_labels] app = airflow_task [task_execution_docker_runtime_environment_variables] # The static key-value pairs environment variables passed to a container # all keys will be upper-cased |
task_execution_docker_runtime
gets all settings from the docker_runtime and overrides them.
scheduler_dag_parsing_docker_runtime
gets all settings from the docker_runtime and overrides them.
Feature Flag and Hook:
task_execution_runtime =
docker
, tasks are run in docker containerscheduler_dag_parsing_runtime =
docker
, dag files are parsed in docker containerTo easily control and override the docker image used in task execution and dag file parsing, there is a hook provided by the airflow_local_settings.py
Code Block |
---|
def get_docker_runtime_image(image): pass |
CLI changes:
Introduce a new flag --docker
, when this is specified , the airflow run --local --docker
will create a docker container to run airflow run --local
process.
Caveat
docker container isn't free. It takes some time and resources to launch a docker container.
Which users are affected by the change?
No user impact. This feature is controlled by a feature flag.
How are users affected by the change? (e.g. DB upgrade required?)
NA
What defines this AIP as "done"?
dag file is able to be parsed in docker container and airflow task is able to run inside docker container