Status

StateDraft
Discussion Thread
https://lists.apache.org/thread/9kw13cxcg06p37shv57hsomnx6zognoc
Created

2022-01-25

Motivation

Airflow worker host is a shared resource among all tasks running on it. Thus, it requires hosts to provision dependencies for all tasks, including system and python application level dependencies. It leads to a very fat runtime, thus long host provision time and low elasticity in the worker resource.

The lack of runtime isolation makes it challenging and risky to do operations, including adding/upgrading system and python dependencies, and it is almost impossible to remove any dependencies. It also incurs lots of additional operating costs for the team as users do not have permission to add/upgrade python dependencies, which requires us to coordinate with them. When there are package version conflicts, it prevents installing them directly on the host. Users have to use PythonVirtualenvOperator.

Considerations

To solve those problems, I propose introducing docker runtime for Airflow tasks and dag parsing (It won't be the default runtime). It leverages docker as the tasks runtime environment. There are several benefits:

  1. Provide runtime isolation on task level
  2. Customize runtime to parse dag files
  3. Lean runtime, which enables high worker resource elasticity
  4. Immutable and portable runtime
  5. Process isolation ensures that all subprocesses of a task are cleaned up after docker exits


Note: this AIP does NOT force users to adopt docker as the default runtime. It adds an option to parse dag files in docker  container and run tasks in docker container.

What change do you propose to make?

Airflow Worker


The current airflow worker runtime is shared with all tasks on a host. An airflow worker is responsible for running an airflow task.

Current process hierarchy is:

    airflow worker process

      → `airflow run --local` process

        → `airflow run --raw` process

            → potential more processes spawned by tasks

In the new design, the `airflow run local` and `airflow run raw` processes are running inside a docker container, which is launched by an airflow worker. In this way, the airflow worker runtime only needs minimum requirements to run airflow core and docker.

Airflow Scheduler

Instead of processing the DAG file directly, the DagFileProcessor process launches a docker container required by that DAG file to process it and persists the serializable DAGs (SimpleDags) to the db.

This ensures the DAG parsing runtime is exactly the same as DAG execution runtime.

This requires a DAG definition file to tell the DAG file processing loop to use which docker image to process it. We can easily achieve this by having a metadata file along with the DAG definition file to define the docker runtime . To ease the burden of users, the infra team provide a default docker image when a DAG definition file does not require customized runtime.

Dag metadata yaml file:

This yaml file describes the docker runtime for the dag file. for example, if there is a dag file called /tmp/dag.py , the metadata yaml file will be /tmp/dag.py.metadata.yaml 

docker_runtime:
  image: airflow-docker-image:0.0.1
  volumes:
    - a:b
  environments:
    key: value
  # and more

everything defined in this dag.py.metadata.yaml will override the global docker settings in the next section.

When no dag_file_name.py.metadata.yaml , the runtime config uses the global settings.

Docker Related Settings:

docker settings for the scheduler and worker can be configured separately in the airflow.cfg but with a common section.  for example.


[docker_runtime]
CONTAINER_REPOSITORY = 
CONTAINER_TAG = 

SOCKET_PATH = unix:///var/run/docker.sock
MOUNT_DOCKER_SOCKET = False
STOP_TIMEOUT = 20
volume_mount_yaml = <filepath_to_volume_mounts> # since this can be very long and complex, we want it to be a separated file so that it can be managed easily.

NETWORK_MODE = 
OOM_KILL_DISABLE = False
REMOVE_CONTAINER_AFTER_EXIT = True
PRIVILEGED = False
LIMIT_RESOURCE = True
ENVIRONMENT_VARIABLES_FROM_HOST =
CGROUP_MOUNT_PATH = /sys/fs/cgroup
DOCKER_CGROUP_SUBPATH = docker

[docker_runtime_environment_variables]
AIRFLOW_HOME = 
PYTHONPATH = 

[scheduler_dag_parsing_docker_runtime] # this section gets all settings from the docker_runtime and overrides them
RUN_AS_USER = airflow
LIMIT_RESOURCE = False
MOUNT_DOCKER_SOCKET = False

[task_execution_docker_runtime] # this section gets all settings from the docker_runtime and overrides them
RUN_AS_USER = airflow
LIMIT_RESOURCE = True
MOUNT_DOCKER_SOCKET = True

[task_execution_docker_runtime_labels]
app = airflow_task

[task_execution_docker_runtime_environment_variables]
# The static key-value pairs environment variables passed to a container
# all keys will be upper-cased

task_execution_docker_runtime  gets all settings from the docker_runtime and overrides them.

scheduler_dag_parsing_docker_runtime  gets all settings from the docker_runtime and overrides them.

Feature Flag and Hook:

task_execution_runtime = 

  when the value is docker , tasks are run in docker container

scheduler_dag_parsing_runtime =

  when the value is docker , dag files are parsed in docker container


To easily control and override the docker image used in task execution and dag file parsing, there is a hook provided by the airflow_local_settings.py 

def get_docker_runtime_image(image):
    pass


CLI changes:

Introduce a new flag --docker  , when this is specified , the airflow run --local --docker  will create a docker container to run airflow run --local  process.

Caveat

docker container isn't free. It takes some time and resources to launch a docker container.

Which users are affected by the change?

No user impact. This feature is controlled by a feature flag.

How are users affected by the change? (e.g. DB upgrade required?)

NA

What defines this AIP as "done"?

dag file is able to be parsed in docker container and airflow task is able to run inside docker container

7 Comments

  1. I moved it  under AIP-1 (we can move it out if you think this is not really appropriate to make it part of AIP-1 Ping Zhang 

  2. Brutally honest question: Isn't this just an Executor?

    Oh I guess not with the dag parsing too.

    1. That said, with the split out of dag_processing in AIP-43 that could handle the parsing side, so the execution side can just be an executor.

      1. i think it is a middle layer between the executor and the actual task runner since both CeleryExecutor, LocalExecutor can use docker runtime.

  3. Hey Ping Zhang, following yesterday's discusion I renamed it to more generic "Runtime Isolation for DAG Parsing and Execution" (smile)

    1. Jarek Potiuk should we move it out of the security? it can address some security issues but the main goals of it is not for security. let me know your thoughts. thanks

      1. With this change yeah. We should move it out.