Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


StateCompleted
Discussion Threadhttps://lists.apache.org/thread/rgovdphjrxtcqfmcfdrbfntwrdr2nj71
Created

Created

AuthorMateusz Henc



Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API (separate AIP).

DagProcessor is currently part of Scheduler. It works with user code, it is considered "untrusted", while SchedulerJob is "trusted". With changes below it will be possible to start DagProcessor and SchedulerJob as independent processes running on different hosts.

Proposal

Move find_zombies to Scheduler Job

Finding zombies is currently run in the DAG file processing loop, however it feels to have more in common with Scheduler Job than processing files. 

I propose to move the code of _find_zombies method to SchedulerJob class and run it there on a configured interval (next to adopt_or_reset_orphaned_tasks or emit_metrics).

The interval is a new configuration option [scheduler]zombie_detection_interval with default to 30 seconds -  currently it's 10 seconds, which sounds too often.

Introduce new configuration option

I propose to add a new configuration option [scheduler]standalone_dag_processor defaulted to False.

When this option is "True", the SchedulerJob doesn't start the DagFileProcessortAgent on startup. It's up to the user to run it in an independent process.

Introduce new Airflow CLI command 

DAG processor can be executed in standalone mode with airflow dag-processor  command, accepting following inputs:

  • --subdir/-s  - directory from which to parse dags, it is subfolder of $AIRFLOW_DAG_FOLDER is present
  • --num-runs/-n - Number of runs to execute before exiting. 0 for infinity
  • --parsing-processes/-P - Number of processes to be used by the Dag Processor - overrides the [scheduler]parsing_processes option.

On startup it executes the DagFileProcessorManager process.

Move callbacks queue to database

Callbacks allow you to execute additional functions on dag/task success or failure. Currently they are triggered from both Scheduler Job (e.g. DagRun timeout) and Dag Processing (Zombie detection). 

Callbacks are executed by Dag Processor, which makes sense as the code is defined in dag file (untrusted).  However, to allow Scheduler Job to trigger callback, in case DagProcessor is running as a standalone process, the information about callback must be passed to DagProcesor.

 I proposed to introduce a new Table CallbackRequest for storing the callbacks, with following fields:

  • id (int) - uniquely identifies the callback
  • created_at (datetime) - Time when the callback was added
  • priority_weight (int) -   higher values = higher priority
  • message(string) - additional Message that can be used for logging
  • callback_type (enum) -  TaskCallback, DagCallback or SlaCallback
  • callback_data (JSON) - serialized metadata of the callback
  • processor_subdir (string) - contains the information about the dag directory(required for "multiple dags directory" described below

Callback_data allow to construct the  callback object: TaskCallbackRequest, DagCallbackRequest, SlaCallbackRequest.

Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database.

Dag Processor during the execution loop fetches max_callback_per_loop=XX (higher callback_priority first), deletes them from the database, builds proper CallbackRequests objects and adds them to the "_callback_to_execute" list and processes them the same way as it does now. Each Dag Processor only executes callbacks from a directory it reads DAGs from(based on the dag_directory column).

Airflow still uses the queue in [scheduler]standalone_dag_processor=Falsemode. 

Code for handling the internal queue of callbacks (currently in` DagFileProcessorManager._callback_to_execute`) is moved to airflow/callbacks/ with BaseCallbackSink  and its two implementations PipeCallbackSink (sends callbacks to DagProcessor  using current existing Pipe) and DatabaseCallbackSink  (adds callbacks to Database).

DagProcessor in its loop calls additional method fetch_callbacks  (in [scheduler]standalone_dag_processor=True  mode) which fetches callbacks from database and adds them to the internal queue.

BaseExecutor extension

To make the callbacks handling more extendable and future-proof, new callbacks are sent for execution (both legacy and standalone dag processor mode) using a new method on BaseExecutor: send_callback_to_execute.This method is implemented in BaseExecutor by calling BaseCallbackSink , which is using, according to [scheduler]standalone_dag_processor=True  mode, either  PipeCallbackSink  or DatabaseCallbackSink .

This allows to override this behavior in sub-Executors.

Alternative approach: Running callbacks in workers

Callbacks could also be executed by workers, as they may be considered as part of "task execution" and it feels natural to execute task-related callbacks there, however there are some problems with this, especially when using KubernetesExecutor :

  • Performance - creating new Worker only to execute a callback (which is usually small and fast) may be too slow
  • Resources - no need to spin new Pod while DagProcessor is able to handle the callback
  • Simplicity - the code for handling callback is already in DagProcessor - smaller changes requires
  • Also DagFileProcessor is a new entity which has not been considered before, so it might feel unnatural to tie callback execution with it. However the processor  already executes user code so from the security point of view the the same security “perimeter” needs to be maintained for both anyway and performance implications are in favor of DagFileProcessor

Add support for multiple dag directories 

This change allows multiple dag directories. Each Dag Processor component parsed files from different directories, passed with --subdir/-S  parameter. It allows e.g. multiple teams to share the same Airflow installation, putting dags in different directories with different sets of permissions.

To make it possible I propose to extend SerializedDagModel and DagModel tables with new column:

  • dag_directory (string)

It contains the information about the dag directory f. During DAG file parsing, the processor sets this field value to the value of --subdir 

When removing deleted dags, only dags with dag_directory equal to dag processor's --subdir value are removed/marked as inactive.

Cleanup

Additional cleanup of DagModel/SerializedDagModels is required as DagProcessor may be stopped or restarted with different settings, leaving the abandoned Dags in the database. They are cleaned by SchedulerJob's configured interval(running every 60 seconds)  which marks as inactive all DAGs that were not updated by DagProcessor within the last 1 hour. 

DagPolicy

There is also a problem with two different dag files with the same dag_id in different directories. As a solution I propose to use Airflow DagPolicy  for this purpose - users will be able to define a policy that  checks if Dag ID meets the conditions, eg.

Code Block
def dag_policy(dag: DAG):
     subdir =  FileProcessorManager.get_current_subdir()
     if not dag.dag_id.startswith(subdir):
            raise AirflowClusterPolicyViolation(
            f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
        )

Where FileProcessorManager.get_current_subdir  is a new method returning current subdir parameter of Dag-Processor.

Deployment options

Below you can see the possible deployment options of Airflow installation after these changes are implemented.

  • [scheduler]standalone_dag_processor=False (default)

The Dag Processor still runs as part of Scheduler. No changes for the end-user.

draw.io Diagram
bordertrue
diagramNamedefault-1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth631
revision2

  • [scheduler]standalone_dag_processor=True. Single Dag Processor

Dag Processor runs as a standalone component. Users need to start a scheduler job (airflow scheduler) and Dag Processor (airflow dag-processor) independently.

draw.io Diagram
bordertrue
diagramNamedefault setup
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth631
revision6

  • [scheduler]standalone_dag_processor=True. Multiple Dag Processors

Multiple Dag Processors - each parsing dags from a different directory. Users need to start a scheduler job (airflow scheduler) and each Dag Processor (airflow dag-processor --subdir /dag-X). This allows to set up different subdirectories for dags for different teams which are parsed independently and in isolation.

draw.io Diagram
bordertrue
diagramNamedeployment-3
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth691
revision1

What problem does it solve?

Allows to run multiple Dag Processors as independent components on separate hosts parsing DAGs from distinct locations.

Why is it needed?

This is needed to run DagProcessor in "DBIsolation" mode (separate AIP), without direct access to the database while Scheduler Job still has this access.

Which users are affected by the change?

Only those that set [scheduler]standalone_dag_processor=True

How are users affected by the change? (e.g. DB upgrade required?)

DB upgraded required to use the new mode.

What defines this AIP as "done"?

DagProcessor and SchedulerJob are executed independently on different hosts. it is possible to run multiple DagProcessors parsing DAGs from different directories.




Status

Page properties


StateAccepted
Discussion Threadhttps://lists.apache.org/thread/rgovdphjrxtcqfmcfdrbfntwrdr2nj71
Created

Created

AuthorMateusz Henc



Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API (separate AIP).

DagProcessor is currently part of Scheduler. It works with user code, it is considered "untrusted", while SchedulerJob is "trusted". With changes below it will be possible to start DagProcessor and SchedulerJob as independent processes running on different hosts.

Proposal

Move find_zombies to Scheduler Job

Finding zombies is currently run in the DAG file processing loop, however it feels to have more in common with Scheduler Job than processing files. 

I propose to move the code of _find_zombies method to SchedulerJob class and run it there on a configured interval (next to adopt_or_reset_orphaned_tasks or emit_metrics).

The interval is a new configuration option [scheduler]zombie_detection_interval with default to 30 seconds -  currently it's 10 seconds, which sounds too often.

Introduce new configuration option

I propose to add a new configuration option [scheduler]standalone_dag_processor defaulted to False.

When this option is "True", the SchedulerJob doesn't start the DagFileProcessortAgent on startup. It's up to the user to run it in an independent process.

Introduce new Airflow CLI command 

DAG processor can be executed in standalone mode with airflow dag-processor  command, accepting following inputs:

  • --subdir/-s  - directory from which to parse dags, it is subfolder of $AIRFLOW_DAG_FOLDER is present
  • --num-runs/-n - Number of runs to execute before exiting. 0 for infinity
  • --parsing-processes/-P - Number of processes to be used by the Dag Processor - overrides the [scheduler]parsing_processes option.

On startup it executes the DagFileProcessorManager process.

Move callbacks queue to database

Callbacks allow you to execute additional functions on dag/task success or failure. Currently they are triggered from both Scheduler Job (e.g. DagRun timeout) and Dag Processing (Zombie detection). 

Callbacks are executed by Dag Processor, which makes sense as the code is defined in dag file (untrusted).  However, to allow Scheduler Job to trigger callback, in case DagProcessor is running as a standalone process, the information about callback must be passed to DagProcesor.

 I proposed to introduce a new Table CallbackRequest for storing the callbacks, with following fields:

  • id (int) - uniquely identifies the callback
  • created_at (datetime) - Time when the callback was added
  • priority_weight (int) -   higher values = higher priority
  • message(string) - additional Message that can be used for logging
  • callback_type (enum) -  TaskCallback, DagCallback or SlaCallback
  • callback_data (JSON) - serialized metadata of the callback
  • dag_directory (string) - contains the information about the dag directory(required for "multiple dags directory" described below

Callback_data allow to construct the  callback object: TaskCallbackRequest, DagCallbackRequest, SlaCallbackRequest.

Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database.

Dag Processor during the execution loop fetches max_callback_per_loop=XX (higher callback_priority first), deletes them from the database, builds proper CallbackRequests objects and adds them to the "_callback_to_execute" list and processes them the same way as it does now. Each Dag Processor only executes callbacks from a directory it reads DAGs from(based on the dag_directory column).

Airflow still uses the queue in [scheduler]standalone_dag_processor=Falsemode. 

Code for handling the internal queue of callbacks (currently in` DagFileProcessorManager._callback_to_execute`) is moved to airflow/callbacks/ with BaseCallbackSink  and its two implementations PipeCallbackSink (sends callbacks to DagProcessor  using current existing Pipe) and DatabaseCallbackSink  (adds callbacks to Database).

DagProcessor in its loop calls additional method fetch_callbacks  (in [scheduler]standalone_dag_processor=True  mode) which fetches callbacks from database and adds them to the internal queue.

BaseExecutor extension

To make the callbacks handling more extendable and future-proof, new callbacks are sent for execution (both legacy and standalone dag processor mode) using a new method on BaseExecutor: send_callback_to_execute.This method is implemented in BaseExecutor by calling BaseCallbackSink , which is using, according to [scheduler]standalone_dag_processor=True  mode, either  PipeCallbackSink  or DatabaseCallbackSink .

This allows to override this behavior in sub-Executors.

Alternative approach: Running callbacks in workers

Callbacks could also be executed by workers, as they may be considered as part of "task execution" and it feels natural to execute task-related callbacks there, however there are some problems with this, especially when using KubernetesExecutor :

  • Performance - creating new Worker only to execute a callback (which is usually small and fast) may be too slow
  • Resources - no need to spin new Pod while DagProcessor is able to handle the callback
  • Simplicity - the code for handling callback is already in DagProcessor - smaller changes requires
  • Also DagFileProcessor is a new entity which has not been considered before, so it might feel unnatural to tie callback execution with it. However the processor  already executes user code so from the security point of view the the same security “perimeter” needs to be maintained for both anyway and performance implications are in favor of DagFileProcessor

Add support for multiple dag directories 

This change allows multiple dag directories. Each Dag Processor component parsed files from different directories, passed with --subdir/-S  parameter. It allows e.g. multiple teams to share the same Airflow installation, putting dags in different directories with different sets of permissions.

To make it possible I propose to extend SerializedDagModel and DagModel tables with new column:

  • dag_directory (string)

It contains the information about the dag directory f. During DAG file parsing, the processor sets this field value to the value of --subdir 

When removing deleted dags, only dags with dag_directory equal to dag processor's --subdir value are removed/marked as inactive.

Cleanup

Additional cleanup of DagModel/SerializedDagModels is required as DagProcessor may be stopped or restarted with different settings, leaving the abandoned Dags in the database. They are cleaned by SchedulerJob's configured interval(running every 60 seconds)  which marks as inactive all DAGs that were not updated by DagProcessor within the last 1 hour. 

DagPolicy

There is also a problem with two different dag files with the same dag_id in different directories. As a solution I propose to use Airflow DagPolicy  for this purpose - users will be able to define a policy that  checks if Dag ID meets the conditions, eg.

Code Block
def dag_policy(dag: DAG):
     subdir =  FileProcessorManager.get_current_subdir()
     if not dag.dag_id.startswith(subdir):
            raise AirflowClusterPolicyViolation(
            f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
        )

Where FileProcessorManager.get_current_subdir  is a new method returning current subdir parameter of Dag-Processor.

Deployment options

Below you can see the possible deployment options of Airflow installation after these changes are implemented.

  • [scheduler]standalone_dag_processor=False (default)

The Dag Processor still runs as part of Scheduler. No changes for the end-user.

draw.io Diagram
bordertrue
diagramNamedefault-1
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth631
revision2

  • [scheduler]standalone_dag_processor=True. Single Dag Processor

Dag Processor runs as a standalone component. Users need to start a scheduler job (airflow scheduler) and Dag Processor (airflow dag-processor) independently.

draw.io Diagram
bordertrue
diagramNamedefault setup
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth631
revision75

  • [scheduler]standalone_dag_processor=True. Multiple Dag Processors

Multiple Dag Processors - each parsing dags from a different directory. Users need to start a scheduler job (airflow scheduler) and each Dag Processor (airflow dag-processor --subdir /dag-X). This allows to set up different subdirectories for dags for different teams which are parsed independently and in isolation.

draw.io Diagram
bordertrue
diagramNamedeployment-3
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth691
revision1

What problem does it solve?

Allows to run multiple Dag Processors as independent components on separate hosts parsing DAGs from distinct locations.

Why is it needed?

This is needed to run DagProcessor in "DBIsolation" mode (separate AIP), without direct access to the database while Scheduler Job still has this access.

Which users are affected by the change?

Only those that set [scheduler]standalone_dag_processor=True

How are users affected by the change? (e.g. DB upgrade required?)

DB upgraded required to use the new mode.

What defines this AIP as "done"?

DagProcessor and SchedulerJob are executed independently on different hosts. it is possible to run multiple DagProcessors parsing DAGs from different directories.

...