Status

StateCompleted
Discussion Threadhttps://lists.apache.org/thread/rgovdphjrxtcqfmcfdrbfntwrdr2nj71
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

AuthorMateusz Henc
In Release2.4.0

Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API (separate AIP).

DagProcessor is currently part of Scheduler. It works with user code, it is considered "untrusted", while SchedulerJob is "trusted". With changes below it will be possible to start DagProcessor and SchedulerJob as independent processes running on different hosts.

Proposal

Move find_zombies to Scheduler Job

Finding zombies is currently run in the DAG file processing loop, however it feels to have more in common with Scheduler Job than processing files. 

I propose to move the code of _find_zombies method to SchedulerJob class and run it there on a configured interval (next to adopt_or_reset_orphaned_tasks or emit_metrics).

The interval is a new configuration option [scheduler]zombie_detection_interval with default to 30 seconds -  currently it's 10 seconds, which sounds too often.

Introduce new configuration option

I propose to add a new configuration option [scheduler]standalone_dag_processor defaulted to False.

When this option is "True", the SchedulerJob doesn't start the DagFileProcessortAgent on startup. It's up to the user to run it in an independent process.

Introduce new Airflow CLI command 

DAG processor can be executed in standalone mode with airflow dag-processor  command, accepting following inputs:

  • --subdir/-s  - directory from which to parse dags, it is subfolder of $AIRFLOW_DAG_FOLDER is present
  • --num-runs/-n - Number of runs to execute before exiting. 0 for infinity
  • --parsing-processes/-P - Number of processes to be used by the Dag Processor - overrides the [scheduler]parsing_processes option.

On startup it executes the DagFileProcessorManager process.

Move callbacks queue to database

Callbacks allow you to execute additional functions on dag/task success or failure. Currently they are triggered from both Scheduler Job (e.g. DagRun timeout) and Dag Processing (Zombie detection). 

Callbacks are executed by Dag Processor, which makes sense as the code is defined in dag file (untrusted).  However, to allow Scheduler Job to trigger callback, in case DagProcessor is running as a standalone process, the information about callback must be passed to DagProcesor.

 I proposed to introduce a new Table CallbackRequest for storing the callbacks, with following fields:

  • id (int) - uniquely identifies the callback
  • created_at (datetime) - Time when the callback was added
  • priority_weight (int) -   higher values = higher priority
  • message(string) - additional Message that can be used for logging
  • callback_type (enum) -  TaskCallback, DagCallback or SlaCallback
  • callback_data (JSON) - serialized metadata of the callback
  • processor_subdir (string) - contains the information about the dag directory(required for "multiple dags directory" described below

Callback_data allow to construct the  callback object: TaskCallbackRequest, DagCallbackRequest, SlaCallbackRequest.

Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database.

Dag Processor during the execution loop fetches max_callback_per_loop=XX (higher callback_priority first), deletes them from the database, builds proper CallbackRequests objects and adds them to the "_callback_to_execute" list and processes them the same way as it does now. Each Dag Processor only executes callbacks from a directory it reads DAGs from(based on the dag_directory column).

Airflow still uses the queue in [scheduler]standalone_dag_processor=Falsemode. 

Code for handling the internal queue of callbacks (currently in` DagFileProcessorManager._callback_to_execute`) is moved to airflow/callbacks/ with BaseCallbackSink  and its two implementations PipeCallbackSink (sends callbacks to DagProcessor  using current existing Pipe) and DatabaseCallbackSink  (adds callbacks to Database).

DagProcessor in its loop calls additional method fetch_callbacks  (in [scheduler]standalone_dag_processor=True  mode) which fetches callbacks from database and adds them to the internal queue.

BaseExecutor extension

To make the callbacks handling more extendable and future-proof, new callbacks are sent for execution (both legacy and standalone dag processor mode) using a new method on BaseExecutor: send_callback_to_execute.This method is implemented in BaseExecutor by calling BaseCallbackSink , which is using, according to [scheduler]standalone_dag_processor=True  mode, either  PipeCallbackSink  or DatabaseCallbackSink .

This allows to override this behavior in sub-Executors.

Alternative approach: Running callbacks in workers

Callbacks could also be executed by workers, as they may be considered as part of "task execution" and it feels natural to execute task-related callbacks there, however there are some problems with this, especially when using KubernetesExecutor :

  • Performance - creating new Worker only to execute a callback (which is usually small and fast) may be too slow
  • Resources - no need to spin new Pod while DagProcessor is able to handle the callback
  • Simplicity - the code for handling callback is already in DagProcessor - smaller changes requires
  • Also DagFileProcessor is a new entity which has not been considered before, so it might feel unnatural to tie callback execution with it. However the processor  already executes user code so from the security point of view the the same security “perimeter” needs to be maintained for both anyway and performance implications are in favor of DagFileProcessor

Add support for multiple dag directories 

This change allows multiple dag directories. Each Dag Processor component parsed files from different directories, passed with --subdir/-S  parameter. It allows e.g. multiple teams to share the same Airflow installation, putting dags in different directories with different sets of permissions.

To make it possible I propose to extend SerializedDagModel and DagModel tables with new column:

  • dag_directory (string)

It contains the information about the dag directory f. During DAG file parsing, the processor sets this field value to the value of --subdir 

When removing deleted dags, only dags with dag_directory equal to dag processor's --subdir value are removed/marked as inactive.

Cleanup

Additional cleanup of DagModel/SerializedDagModels is required as DagProcessor may be stopped or restarted with different settings, leaving the abandoned Dags in the database. They are cleaned by SchedulerJob's configured interval(running every 60 seconds)  which marks as inactive all DAGs that were not updated by DagProcessor within the last 1 hour. 

DagPolicy

There is also a problem with two different dag files with the same dag_id in different directories. As a solution I propose to use Airflow DagPolicy  for this purpose - users will be able to define a policy that  checks if Dag ID meets the conditions, eg.

def dag_policy(dag: DAG):
     subdir =  FileProcessorManager.get_current_subdir()
     if not dag.dag_id.startswith(subdir):
            raise AirflowClusterPolicyViolation(
            f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
        )

Where FileProcessorManager.get_current_subdir  is a new method returning current subdir parameter of Dag-Processor.

Deployment options

Below you can see the possible deployment options of Airflow installation after these changes are implemented.

  • [scheduler]standalone_dag_processor=False (default)

The Dag Processor still runs as part of Scheduler. No changes for the end-user.

  • [scheduler]standalone_dag_processor=True. Single Dag Processor

Dag Processor runs as a standalone component. Users need to start a scheduler job (airflow scheduler) and Dag Processor (airflow dag-processor) independently.

  • [scheduler]standalone_dag_processor=True. Multiple Dag Processors

Multiple Dag Processors - each parsing dags from a different directory. Users need to start a scheduler job (airflow scheduler) and each Dag Processor (airflow dag-processor --subdir /dag-X). This allows to set up different subdirectories for dags for different teams which are parsed independently and in isolation.

What problem does it solve?

Allows to run multiple Dag Processors as independent components on separate hosts parsing DAGs from distinct locations.

Why is it needed?

This is needed to run DagProcessor in "DBIsolation" mode (separate AIP), without direct access to the database while Scheduler Job still has this access.

Which users are affected by the change?

Only those that set [scheduler]standalone_dag_processor=True

How are users affected by the change? (e.g. DB upgrade required?)

DB upgraded required to use the new mode.

What defines this AIP as "done"?

DagProcessor and SchedulerJob are executed independently on different hosts. it is possible to run multiple DagProcessors parsing DAGs from different directories.




Status

StateAccepted
Discussion Threadhttps://lists.apache.org/thread/rgovdphjrxtcqfmcfdrbfntwrdr2nj71
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

AuthorMateusz Henc

Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API (separate AIP).

DagProcessor is currently part of Scheduler. It works with user code, it is considered "untrusted", while SchedulerJob is "trusted". With changes below it will be possible to start DagProcessor and SchedulerJob as independent processes running on different hosts.

Proposal

Move find_zombies to Scheduler Job

Finding zombies is currently run in the DAG file processing loop, however it feels to have more in common with Scheduler Job than processing files. 

I propose to move the code of _find_zombies method to SchedulerJob class and run it there on a configured interval (next to adopt_or_reset_orphaned_tasks or emit_metrics).

The interval is a new configuration option [scheduler]zombie_detection_interval with default to 30 seconds -  currently it's 10 seconds, which sounds too often.

Introduce new configuration option

I propose to add a new configuration option [scheduler]standalone_dag_processor defaulted to False.

When this option is "True", the SchedulerJob doesn't start the DagFileProcessortAgent on startup. It's up to the user to run it in an independent process.

Introduce new Airflow CLI command 

DAG processor can be executed in standalone mode with airflow dag-processor  command, accepting following inputs:

  • --subdir/-s  - directory from which to parse dags, it is subfolder of $AIRFLOW_DAG_FOLDER is present
  • --num-runs/-n - Number of runs to execute before exiting. 0 for infinity
  • --parsing-processes/-P - Number of processes to be used by the Dag Processor - overrides the [scheduler]parsing_processes option.

On startup it executes the DagFileProcessorManager process.

Move callbacks queue to database

Callbacks allow you to execute additional functions on dag/task success or failure. Currently they are triggered from both Scheduler Job (e.g. DagRun timeout) and Dag Processing (Zombie detection). 

Callbacks are executed by Dag Processor, which makes sense as the code is defined in dag file (untrusted).  However, to allow Scheduler Job to trigger callback, in case DagProcessor is running as a standalone process, the information about callback must be passed to DagProcesor.

 I proposed to introduce a new Table CallbackRequest for storing the callbacks, with following fields:

  • id (int) - uniquely identifies the callback
  • created_at (datetime) - Time when the callback was added
  • priority_weight (int) -   higher values = higher priority
  • message(string) - additional Message that can be used for logging
  • callback_type (enum) -  TaskCallback, DagCallback or SlaCallback
  • callback_data (JSON) - serialized metadata of the callback
  • dag_directory (string) - contains the information about the dag directory(required for "multiple dags directory" described below

Callback_data allow to construct the  callback object: TaskCallbackRequest, DagCallbackRequest, SlaCallbackRequest.

Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database.

Dag Processor during the execution loop fetches max_callback_per_loop=XX (higher callback_priority first), deletes them from the database, builds proper CallbackRequests objects and adds them to the "_callback_to_execute" list and processes them the same way as it does now. Each Dag Processor only executes callbacks from a directory it reads DAGs from(based on the dag_directory column).

Airflow still uses the queue in [scheduler]standalone_dag_processor=Falsemode. 

Code for handling the internal queue of callbacks (currently in` DagFileProcessorManager._callback_to_execute`) is moved to airflow/callbacks/ with BaseCallbackSink  and its two implementations PipeCallbackSink (sends callbacks to DagProcessor  using current existing Pipe) and DatabaseCallbackSink  (adds callbacks to Database).

DagProcessor in its loop calls additional method fetch_callbacks  (in [scheduler]standalone_dag_processor=True  mode) which fetches callbacks from database and adds them to the internal queue.

BaseExecutor extension

To make the callbacks handling more extendable and future-proof, new callbacks are sent for execution (both legacy and standalone dag processor mode) using a new method on BaseExecutor: send_callback_to_execute.This method is implemented in BaseExecutor by calling BaseCallbackSink , which is using, according to [scheduler]standalone_dag_processor=True  mode, either  PipeCallbackSink  or DatabaseCallbackSink .

This allows to override this behavior in sub-Executors.

Alternative approach: Running callbacks in workers

Callbacks could also be executed by workers, as they may be considered as part of "task execution" and it feels natural to execute task-related callbacks there, however there are some problems with this, especially when using KubernetesExecutor :

  • Performance - creating new Worker only to execute a callback (which is usually small and fast) may be too slow
  • Resources - no need to spin new Pod while DagProcessor is able to handle the callback
  • Simplicity - the code for handling callback is already in DagProcessor - smaller changes requires
  • Also DagFileProcessor is a new entity which has not been considered before, so it might feel unnatural to tie callback execution with it. However the processor  already executes user code so from the security point of view the the same security “perimeter” needs to be maintained for both anyway and performance implications are in favor of DagFileProcessor

Add support for multiple dag directories 

This change allows multiple dag directories. Each Dag Processor component parsed files from different directories, passed with --subdir/-S  parameter. It allows e.g. multiple teams to share the same Airflow installation, putting dags in different directories with different sets of permissions.

To make it possible I propose to extend SerializedDagModel and DagModel tables with new column:

  • dag_directory (string)

It contains the information about the dag directory f. During DAG file parsing, the processor sets this field value to the value of --subdir 

When removing deleted dags, only dags with dag_directory equal to dag processor's --subdir value are removed/marked as inactive.

Cleanup

Additional cleanup of DagModel/SerializedDagModels is required as DagProcessor may be stopped or restarted with different settings, leaving the abandoned Dags in the database. They are cleaned by SchedulerJob's configured interval(running every 60 seconds)  which marks as inactive all DAGs that were not updated by DagProcessor within the last 1 hour. 

DagPolicy

There is also a problem with two different dag files with the same dag_id in different directories. As a solution I propose to use Airflow DagPolicy  for this purpose - users will be able to define a policy that  checks if Dag ID meets the conditions, eg.

def dag_policy(dag: DAG):
     subdir =  FileProcessorManager.get_current_subdir()
     if not dag.dag_id.startswith(subdir):
            raise AirflowClusterPolicyViolation(
            f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
        )

Where FileProcessorManager.get_current_subdir  is a new method returning current subdir parameter of Dag-Processor.

Deployment options

Below you can see the possible deployment options of Airflow installation after these changes are implemented.

  • [scheduler]standalone_dag_processor=False (default)

The Dag Processor still runs as part of Scheduler. No changes for the end-user.

  • [scheduler]standalone_dag_processor=True. Single Dag Processor

Dag Processor runs as a standalone component. Users need to start a scheduler job (airflow scheduler) and Dag Processor (airflow dag-processor) independently.

  • [scheduler]standalone_dag_processor=True. Multiple Dag Processors

Multiple Dag Processors - each parsing dags from a different directory. Users need to start a scheduler job (airflow scheduler) and each Dag Processor (airflow dag-processor --subdir /dag-X). This allows to set up different subdirectories for dags for different teams which are parsed independently and in isolation.

What problem does it solve?

Allows to run multiple Dag Processors as independent components on separate hosts parsing DAGs from distinct locations.

Why is it needed?

This is needed to run DagProcessor in "DBIsolation" mode (separate AIP), without direct access to the database while Scheduler Job still has this access.

Which users are affected by the change?

Only those that set [scheduler]standalone_dag_processor=True

How are users affected by the change? (e.g. DB upgrade required?)

DB upgraded required to use the new mode.

What defines this AIP as "done"?

DagProcessor and SchedulerJob are executed independently on different hosts. it is possible to run multiple DagProcessors parsing DAGs from different directories.




31 Comments

  1. Hi Mateusz Henc glad to see this AIP.


    Actually, in Airbnb, we have implemented the Parsing Service and Callback Manager in 2021Q3 and they are currently running very stably in our production for a while.


    The Parsing Service is in charge of 1) parsing dag files, 2) serialize dags 3) persist them to the DB. We have updated the scheduler so that it is able to read the serialized dags to schedule tasks. When parsing dag files in the Parsing Service, it launches a dag container when processing each dag file.

    The CallbackManager collects all callback events from the scheduler (executor_events and zombies), persists them to a db table, callbacks. In the Parsing Service, right after finishing the dag parsing, it checks if there are callbacks to run.


    We are planning to open source this next year. There are lots of overlap work. We would love to cooperate on this. 

    1. cc: Jarek Potiuk 

      Hi,

      Thanks for information.  

      If you already have something that we could be used for this AIP then it would be fantastic. 

      I think we need to stick to the scope of this AIP, as ParsingService may require a separate AIP and discussions.  But we could make sure the DagProcessor could evolve towards it easily.

    2. We have updated the scheduler so that it is able to read the serialized dags to schedule tasks

      This is how the scheduler works in 2.0+ already – so depending on what version you made your changes against it may or may not be applicable to the main branch.

      1. nice, our currently version is 1.10.4

  2. I think it would be great if you can share it with usPing Zhang. Maybe even privately first if you are not ready to open-source it publicly yet ?

    We plan to start implementing it really soon (in fact Mateusz Henc has already part of the job done as Proof of Concept) but seeing an implementation done (and used!) by others might be really helpful.

    We planned to start from the current main and keep it in sync while we are developing it - but if your version is also kept in sync and reuses most of the code, then maybe that can save us a lot of work. Also involving others is a good idea.

    How far are you from open-sourcing decision? You wrote "next year" but how realistic it it is that it will be Q1 next year for example (smile) ?

    Just loudly thinking: One thing that (I guess and anticipate) your separate Parsing Service might not have (and what we are really after) is to make the new "command" of Airflow optional, and to make sure that we are using the existing code of DagProcessor in "main" version of Airflow for that in the way that we avoid code duplication. I guess your goals could be pretty different here so you might end up with a little different design decisions that we might come up with.

    On one hand, our goal is that when "standalone_dag_processor" is set to False, the current behaviour of DagProcessor should not change pretty much at all.  On the other hand when it is set to "True" we want to use existing methods and functions in the code (similarly as in AIP-44 Airflow Internal API).

    So while we want to have separate "command" under the hood it will use the same code as the DagProcessor which is part of scheduler. This is a very important factor for Airflow from the maintainability point of view.

    I am not sure when (and from which version of Airflow) you branched off and how much of the "shared" code it uses. 

    If you cannot share it publicly - maybe you can reach out to us (on Slack) and we can have a call and you could show us what you have ?

    1. Hi Jarek Potiuk definitely a good idea that we chat first and we can also go over the design doc that we have internally. 


      For this:

      " On one hand, our goal is that when "standalone_dag_processor" is set to False, the current behaviour of DagProcessor should not change pretty much at all.  On the other hand when it is set to "True" we want to use existing methods and functions in the code (similarly as in AIP-44 Airflow Internal API)."

      Yep, we designed the parsing service very carefully so that it it totally controlled by a feature flag so that it can easily turn on/off without changing the core of the airflow scheduler so that we can open source it easily. The dag parsing and serialization is reused in the parsing service.


      As for the timeline, we are fully committed to open source all our internal features in 2022 as we also want to upgrade our internal version to > airflow 2.




  3. I see. Since you are 1.10.4 based, then It's more likely than not, that we will use Airflow 2 Schedueler/DagProcessor to start with, but nevertheless it's good to cross-check what you've done and see maybe we can combine some things (smile)

  4. Hello Mateusz Henc, one thing which is not clear to me: how this design will address the possibility to provide one "environment" for a given DAG (mostly in the case of Kubernetes Executor)? As of today the user can only specify the worker image at Task or pod_mutation_hook level, but if a method from a 3rd party package is part of the top level imports/code then it fails (because dag processing is part of the scheduler as well as callbacks ). Do we plan to address this with this new approach, because with the above design, I do not see this addressed.

    1. Hello,

      What do you mean by "environment" ? Is it a set of (pypi-packages + configs + environment variables)? 

      Note that this design about parsing dags, not execution - it allows to run multiple dag processors to separate the dag storage for different teams, etc. Nonetheless workers still need to have access to all these files to be able to execute the task from the proper dag file.  

      cc: @jarek

      1. Hello by environment I meant mainly the pypi-packages.

        In my opinion distinction parsing/execution does not matter too much here for the following reasons:

        • Part of the execution would be done in the dag processor as described in this AIP (callbacks for instance) so they cannot be fully dissociated
        • DAG parsing also requires access to all of the libraries used at top level of a DAG file even though it does not execute any code. What I meant in my previous message is that part of the execution looks more flexible as we can define a custom image at task or pod_mutation_hook level, but the proposed design makes the DAG processor executing even a bit rigid as it assumes it should cover all DAGS. Some imports/library versions could be conflicting with the Airflow constraints for instance etc.

        Do you agree?

        1. Ok, I see.

          So each dag processor could be built with different set of pypi-packages(for different directories) and executed e.g. on Kubernetes independently. Callbacks table contains `dag_loc` field which assurs the dag processor handles only callbacks from its directory.

          Not sure about the workers - whether there is a way to run different tasks using different worker images - I focused there on the "parsing " that I really didn't think of that.

          Jarek Potiuk any thoughts?

          1. Philippe Lanoe  Mateusz Henc While I understand the need - I do not think we should address the separation of environments (i.e. set of installed packages) in this AIP.

            There is similar (already working in their forked Airlfow) idea from the Airbnb team (Xinbin Huang and Ping Zhang) which aims to address this. Maybe they would like to present it on Wednesday additionally to AIP-45 Remove double dag parsing in airflow run). This might be actually very likely candidate for the next AIP in the multi-tenancy "umbrella". But I think AIP-43 should exclusively focus on security aspect of the processing, not about environment separation  - and I would like to focus on that one first before we go into details of next steps.

            Once we have DAG processors separated adding "per-team" environment is pretty natural next step. Happy to discuss it on Wednesday in the "what's next" section. But adding it now to the scope of what we want to achieve in AIP-43 would complicate and prolong this first (baby) steps we want to take. We know there are many, many things we want to add for Multi-Tenancy overall (that's why we have "umbrella AIP-1" and sub-AIPs. 

            For now we want to make some minimal changes that address basic things and are immediately useful and usable. And build on top of it later.

  5. A nit issue in the doc: seems the same contents are duplicated twice?

  6. A couple of questions about the new callbacks table:

    • Would it make sense to have the priority field be called "priority_weight", since then that matches the name used for TaskInstances? Is the notion of weighted callback execution something that already exists or new for this AIP?
    • Will "dag_loc" be relative to "dag_directory"? It's called "file_loc" in other tables like DagModel and SerializedDagModel.

    Are there any restrictions or assumptions on dag_directory? Can it be nested more than one level deep under the root dag directory for example?

    I'd also like to understand more about the security of the multiple directory approach as it would be good to ensure we are building a solution which allows for strong security from the start. It is mentioned that "putting dags in different directories with different sets of permissions". Could you expand on this? Would it then be expected for each team's DagProcessor instance be run as a distinct OS user? Is this something we need to allow configuration for at deployment time? Who is responsible for managing the file permissions so as to prevent untrusted code making arbitrary file system access outside of their subdir? How would the "dag_directory" approach work in a Kubernetes deployment where you could imagine configuring a volume mount to only mount the relevant sub-directory on a shared filesystem but into a common location like "/usr/local/airflow/dags" (one option is that we could say you have to set the mount point to the same name as the subdir).

    It feels like perhaps in the proposal "dag_directory" is being used as a sort of interim proxy for a tenant/team "namespace" within Airflow. Proper namespacing would obviously would be a much larger change but instead of adding "dag_directory" to the DagModel/SerializedDagModel would some namespace id string be equivalent? You could even imagine prefixing the dag ID with a unique namespace, which would also help with the ID clash issue. For example:

    /usr/local/airflow/dags/
      team1/
        dag1.py -- with a DAG ID of "dag1", but in the DB as "team1::dag1"
        dag2.py
      team7/
        dag1.py
        dag2.py    

    For backwards compatibility, any DAG without a "^.*::" prefix could be considered in the default namespace. Or it could be a new namespace column. Either way it would obviously require some additional thought as to how and where IDs are used in the code to ensure backwards compatibility.

    However, I am also aware that it would be good to make these type of changes incrementally and to keep things simple.

    1. AFAIK right now callbacks are appended ain the beginning of the queue, some in the end to control the priority, so the field there is to support this case. I don't have strong opinion on the field name, and "priority_weight" sounds reasonable - updated.

      "dag_loc"- yes this is relative to dag_directory. There is no check on Airflow side, but user should e.g. use nested directories. I don't see any easy way to check it (as dag processor are executed independently) so I just left if for the user.

      Regarding security - directories like /dags/team-1, /dags/team-2 for each team and permissions for them.
      Each dag processor could (or even should) be executed as different user or  (in Kubernetes) as a separate Pod. This allows only to mount a single directory for parsing dags (with no access to others)e.g. /home/airflow/dags/team-21 or /home/airflow/dags/team-2 - with AIRFLOW_DAG_DIRECTORY='/home/airflow/dags' and --subdir=teams-1 or --subdir=teams-2. 

      We thought about appending some additional infor to the dag_id. This change sounds too risky - dag_id is use in too many plances, and it may be misleading to users - dags_ id is different from what they see in the dag file. We ended up with the ` --dag-id-regex` that could enforce dag authors to name the dags accordingly.

      Jarek Potiuk 

      1. Ian Buss  Mateusz Henc I think we should see the part of "Multiple" directories as purely an isolation for DAG Processors code execution only. We should not look further than a very basic feature of "code execution isolation" between DAG File processors. This is a very "low-level" feature really that can be used by various deployments as they see fit - without changing how Airflow works. And one that we might be able to build on later to add more high-level features.

        The point here is that we only want to separate code being executed between various groups of DAGs (directories seem to be natural fit for grouping). No more, no less. The proposal is purely about adding a capability that some deployments might utilise the flag - to make sure that code coming from DAGs in the same folder will never be executed in the same process as code from DAGs coming from another folder. That's it.

        Maybe better to explain what we do NOT (for now at least) want to introduce in this AIP:

        • We do not want to introduce a concept of team or tennant (not yet). The DAGs will not be assigned to teams, there will be no "management" of teams in Airflow. Not yet. This will come but it is not the focus of our proposal.
        • We completely abstract away and deliberately do not give any tools or recommendations on how to manage access to the folders
        • We do not want do define who and how is responsible to manage access to parts of the DAG folder. 
        • We do not want to enforce any way of mounting all or parts of DAG folders in workers.
        • We do not really want to change the way how Airflow Scheduler or workers process the parsed dags - specifically we do not want to introduce a mechanism (yet) where dedicated workers could only process dags coming from a selected sub-folder. This is NOT part of this AIP (maybe surprisingly).

        The way how this feature is utilized is left purely to actual deployment.

        • I can imagine that a deployment will be done in the way that the DAGFileProcessors for different folders will be run in separate processes, containers, Pods, using different users, different machines, different clusters or even different security zones
        • I can imagine that there will be users with write access to some or parts of the DAG folder.
        • I can imagine deployments that this feature is not used
        • I can imagine that some deployments might choose to have dedicated "per subfolder" group of workers which will be executing only DAGs coming from separate folders (this can be done via custom Executor to choose workers based on the location).
        • But I think decisions about that are outside of "Airlfow application" domain and falls into "Airlfow deployment" domain. 

        The only thing we want to add option to have capabilities of running multiple DAG file processors - in the way that one DAG processor will never execute DAG-provided code (parsing and callbacks) belonging to a different "DAG processor".

        That's it. No more no less. 

        1. Jarek Potiuk Mateusz Henc Yes, it's of course a perfectly reasonable engineering approach to keep the proposed change limited in scope and I don't have a problem there and 100% agree that messing around with the DAG ID should not be part of this change. However, I guess my concern is that the ability to run multiple DAG processors is not actually strictly necessary to achieve code execution isolation, which is the specific aim of AIP-43 if I understand correctly? Don't get me wrong, it's a nice to have and something that is absolutely needed (for multitenancy and scaling etc), but probably if you're talking about keeping changes limited in scope you could argue that adding multiple DAG processors is something that could be added after as part of wider support for multitenancy? Just something to consider before adding columns to core tables which may be hard to remove in future. With all that said, I may have misunderstood and it is in fact a key part of the proposal, so apologies for the noise if that's the case.

          1. No apologies needed (smile). Those are valid concerns. 

            Let me explain why it is a key to be implemented now. 

            The goal of those two is that it they are an absolute minimal change (IMHO) that actually make possible to implement "simple" and "secure" multitenant approach. Only from security perspective - not fully multitenant with all bells and whistles but at least with secure isolation between "tenants".

            With those two AIPs implemented, you should be able to isolate code between two groups of users (tenants) without having to modify Airflow core/database. If only you properly configure the deployment, access rights for directories - you can do it if you are determined (and write your own executor for that).

            Without adding the --subdir  flag, it's simply impossible (without modifying Airflow's - DAGProcessor code). All the code coming from DAG folder is executed within the same process or group of processes - without any mechanism except the process boundary to isolate those.


            IMHO. Not having this isolation is really "no-go" for any multi-tenant  approach becase those processes share same user, disk space. temporary folders and potentially memory via multiprocessing and there is no actual isolation. In the sense, just implementing AIP-44 without AIP-43 and subdir  makes the multi-tenant feature "toothless".

            1. Another comment. Basing it on 'subdir' rather that introducing another "Airflow" abstract like "tenant id" is also a way to utilize as much as possible the existing security features by the "deployment".

              Most of the filesystems and "dag syncing" solutions used in Airflow already have some access mechanisms built in - and we can make "deployment" aware of those and make use of it - rather than Airflow. Which will allow to get "multi-tenancy" security with minimal changes of the Airflow code.

  7. Introduce additional parameter for DagProcessor airflow dag-processor command  --dag-id-regex

    This sounds like a duplication of DagPolicy functionality. Maybe instead of adding additional commandline args, we could allow a user to specify separate cluster policies for each Dag Processor instance? This would allow for more advanced control as well, such as limiting the execution timeout of all DAGs from a single directory.

    I suppose that this is already possible with existing cluster policies (
    if dag.fileloc == "subfolder":, etc), but I think that this could be improved upon in a multi-tenant setting. 

      1. That's an interesing idea Sam Wheating . I really like it:. I think this could be indeed much more versatile this way, and simplifies command line/deployment of the processor.

        Example:

        def dag_parser_policy(subdir: Optional[str], dag: DAG):
            """Ensure that DAG comes from the right folder."""
            if not dag.dag_id.startswith(subdir + '.'):
                raise AirflowClusterPolicyViolation(
                    f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
                )


        1. I suppose that this is already possible with existing cluster policies (if dag.fileloc == "subfolder":, etc), but I think that this could be improved upon in a multi-tenant setting. 

          A comment: I think the problem currrently is that DAG policy would not have access to "subdir" (def dag_policy(dag: DAG) - there is no parameter to pass the subdir to it) . I think having a "dag_parser_policy" would be conceptually nicer than to add "subdir" parameter to dag_policy. 

        2. This sounds good to me, one small change I would suggest is that the proposed {subdirectory}.{dag_id} naming convention collides with the {parent_dag_id}.{subdag_id} naming convention used for SubDAGs, maybe we can pick a different separator to indicate the subdirectory in a mutli-tenant settting?

          1. Ah yeah. This is just example, but yeah, it could be '-'. The natural candidate "/" is not good because of the treatment "/' gets with API/flask https://github.com/apache/airflow/issues/20063 and we have not much choice:  KEY_REGEX

            r'^[\w.-]+$'
  8. Mateusz Henc in terms of the callbacks execution, the Scheduler should not schedule task instances if there are callbacks associated with them. let me know your thoughts.

    1. Isn't it a breaking change? I believe right now tasks instances are executed.

      cc: Jarek Potiuk 

  9. One of the issues we have with a large airflow deployment is airflow requirements conflicting with other third party library or internal code requirements. Does this AIP intend to allow users to run the airflow components in an airflow-only venv and parse dags using another venv? The DAGs have to be written in an env with some airflow installation to use airflow models and components, but perhaps there could be a minimal airflow installation for writing/parsing dags and a "complete" one that contains the rest of the infrastructure? Having the ability to parse dags in a separate venv from the main airflow installation, similar to how we can run tasks in a different venv, would be important to making airflow truly multi-tenant.

    My interest is basically detaching sqlalchemy and attrs versions from the airflow pins.

    1. Nope. You're likely looking at AIP-46 Runtime isolation for airflow tasks and dag parsing - it's related but different AIP.