Status

StateDraft
Discussion Thread
Githubhttps://github.com/apache/airflow/issues/13364


Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Motivation

Once getting to a point when you want to create cluster for different types of python tasks and you've multiple teams working on the same cluster, you need to start splitting into different python packages the business login code to allow better versioning control and unit testing outside of Airflow scope.

The issue with having a single virtual env (or globally installed) is that you have a lot of version conflicts between packages and every upgrade requires Airflow restart.

Considerations

What change do you propose to make?

In order to solve this issue, I suggest to introduce venv management as part of the celery and local executors lifecycle.

Option A - make the venv config part of the DAGs code:

Each task can include venv configuration inside executor_config, which will include the venv name and list of packages.

Once the executor gets the task, it will check if the venv exists in the machine, use a lock to make sure it's the only process currently updating the venv, installs the packages and then run the airflow run command from inside the venv.

Option B - manage venv as an Airflow DB model

Using the UI, CLI and API, the user will be able to create a venv by providing name and list of requirements, then each task will be able to point to a venv id and then the executor will be able to create the venv if needed and run the task in it.

What problem does it solve?

  1. Allowing users to create a complex Airflow cluster with multiple packages easily.
  2. Creating a standard best practice of how util functions should be written and how to write the logic outside of the Airflow dag.

Why is it needed?

It's currently almost impossible to achieve that without having a complex deployments scripts that updates all the workers and reload the airflow process (and in the LocalExecutor it means running tasks need to be killed).

While it is possible to achieve it when using CeleryExecotur, execute_tasks_new_python_interpreter=True and creating celery worker per venv, in a big cluster this can led to creation of a lot of celery workers (which consumes system resources) and it forces the user to still use their own deployment methods to update all venvs in the cluster.

Are there any downsides to this change?

For both options:

  1. This solution can make the tasks creation a bit slower.
  2. The venv must contain the Airflow packages (which can be solved using system-site-packages and copy venv).

For option A:

  1. There is not easy way to fix conflicts, multiple tasks can override each other versions and it can create confusion and make debugging harder.

Which users are affected by the change?

No one, as long as they don't configure tasks with venvs.

How are users affected by the change? (e.g. DB upgrade required?)

There will be a need for DB upgrade to create the venv table.

Other considerations?

N/A

What defines this AIP as "done"?

Users can create venvs and run tasks inside them using Airflow.


4 Comments

  1. This is a great idea.

    It's actually one of the downsides of the latest airflow versions when compared with other platforms (i.e. Prefect).

    I have a use case for this. Having the ability to rely on docker images with custom python dependencies helps a lot and promotes isolation between DAGs.

    Currently if one has a custom/internal python/pip package that wants to use in multiple DAGs (with different versions) it's not possible. Why this? Not every organization centralizes their DAGs in a repo, so each repo with DAGs might move at different paces.

    The current (airflow 2.2) solution is to use a task.DockerOperator, but that's only applicable to tasks not to generic python code that generates a DAG dynamically.
    More context and use case, see Slack thread: https://apache-airflow.slack.com/archives/CSS36QQS1/p1633542812490400

  2. This is not what you are looking for. DAG parsing happens independently on Task execution. If you want to use different env for parsing time, the changes proposed in AIP-43 DAG Processor separation  are your friend - not this one.

  3. I think the problem statement described here was (or soon will be) handled in an "incremental" way and by implementing few other changes that make it possible to address it in a different way. I wonder if we should abandon this AIP?

    • Part of the problem stated here has been addressed and will be released in Airflow 2.4.0 by ExternalPythonOperator (https://github.com/apache/airflow/pull/25780) - with this operator (and TaskFlow API especially) you will be able to utiliset  multiple Python Virtual Environment pre-created in the environment (while you can keep Python VirtualenvOperator for dynamic creation of the venvs). While this does not address a complete, dynamic environment re-creation, it does address a way how to avoid balooing of Celery Workers when you have more environments that you can use via '@external_python' task without the overhead of dynamic creation of virtual environments.
    • The new task flow APIS connected with using Hooks rather than operators and both PythonVirtualenvOperator nad PythonExternalOperator allow to use most of the integrations (via TaskFlow decrorated methods) by using the right Hooks. The basic "approach" has been described in https://medium.com/apache-airflow/generic-airflow-transfers-made-easy-5fe8e5e7d2c2 and while it describes Generic Transfers, it is equally applicable to run a custom integration/Hook in. @task.virtualenv or @task.external_python decorated functions that are now fully-fledged tasks.
    • Also the (near future I hope)  AIP-46 Runtime isolation for airflow tasks and dag parsing addresses the problem of both parsing and executing DAGs using the same (different for each DAG) execution environment (based on Docker Images). And AIP-48 Data Dependency Management and Data Driven Scheduling should help the users to break their DAGs into smaller, inter-dependent workflows, which will make it far more attractive to have AIP-46 runtime isolation model working (Big workflow might contain multiple DAGs and each DAG might be run in a different environment/container).

    While none of the changes above addresses the problem fully, I feel when you combine the possibilities they create, the need for dynamically created venvs described (not in too much details, but still) - is greatly diminished.

    Shall we move this into "abandoned" ? Anyone has a good rasoning why not ?

  4. Moving it to abandonded then. And we can always bring it back.