Status

StateCompleted
Discussion Thread
Github Issue
PRhttps://github.com/apache/airflow/pull/10153
Created

2020-04-02

In Release2.0.0

Motivation

Currently, the SubDagOperator launches a completely different DAG and then monitors it as a separate entity. This has lead to all sorts of edge case (e.g. when workers have different executors than the scheduler). Also currently, there are many handling logics for subdags in the codebase, which increase the maintenance burden. 

This AIP propose to introduce the TaskGroup concept as an alternative to SubDagOperator. TaskGroup is a simple UI grouping concept for tasks. Tasks in the same TaskGroup are grouped together on the UI. All the tasks stay on the same original DAG. This allows us to consolidate all subdag tasks into a single task at the UI level, while all tasks will be organized in the same dag and handled by the same scheduler.

Considerations

What change do you propose to make?

The goal of this proposal is to introduce a new concept called TaskGroup. It can be used as an alternative to SubDagOperator. This AIP does not aim to remove SubDagOperator, although we may consider deprecating it in the future if TaskGroup proves more useful and easier to maintain.

This proposal introduces TaskGroup as a utility class. All tasks created in the context of a TaskGroup are added to the group. The task_id of the tasks within the group are pre-fixed with the group_id of the TaskGroup. The TaskGroup is passed to the UI as a dictionary. The UI then renders the graph based on the grouping information. Internally, all tasks remain on the same DAG. There is thus no change to how the scheduler works.

The TaskGroup class

TaskGroup is modelled as a tree. Every TaskGroup has zero or more children. The children can be either a TaskGroup or a BaseOperator. 

Every DAG has a root TaskGroup. When a DAG is first created, its task_group is initialized to a TaskGroup with group_id and parent_group set to None. This is the root TaskGroup. All subsequently created TaskGroup and tasks are added to the root TaskGroup unless they are explicitly given a TaskGroup.

Putting a task into a TaskGroup

When a BaseOperator is constructed with a task_group specified, or if it is constructed within the contextmanager of a TaskGroup, it is added to the TaskGroup. The only way to add a task to a TaskGroup is during the task's construction. Once it is added, the task_id of this task is prefixed with the group_id and cannot be changed. If no TaskGroup is explicitly given and no other TaskGroup exists, a task is added to the root TaskGroup of the DAG.

task_id of a task within a TaskGroup

The task_id of BaseOperator is changed to a property. It is prefixed with its TaskGroup's group_id which itself is prefixed with the group_id of its parent TaskGroup.

Setting dependencies on TaskGroup

Dependency relationships can be defined between a TaskGroup and a BaseOperator as well as between a TaskGroup and another TaskGroup. TaskGroup acts as a collection of BaseOperator. The >> and << operators apply the operation over all tasks within the TaskGroup.

Webserver changes to handle TaskGroup

The root TaskGroup of the dag is converted to a dict called "nodes" and passed to the UI. The dict represents the tree structure of nested TaskGroup. It also has some meta data such as tooltip and ui_color of the TaskGroup.

UI changes to display TaskGroup

Most of the changes happen at the UI layer in Graph View. Two javascript functions expand_group() and collapse_group() are added. When the page is first opened, only the root TaskGroup is expanded, i.e. only top level tasks and TaskGroups are shown. expand_group() is called when a group node is clicked. When user clicks on the empty regeion in the expanded group node, collapse_group() is called.

The expand_group() function examines the nodes dictionary and find the next level of children nodes that need to be displayed. It also looks at the list of edges to determine what edges need to be added to the graph when the group is expanded.

The collapse_group() function does the opposite. It removes children nodes from the graph that are collapsed. And also replaces the children's edges with edges that goes in/out of the parent group.

TaskGroup serialization

TaskGroup is added as a new attribute of DAG. It is serialized/deserialized just like other DAG attributes. The serialized representation of TaskGroup contains only children task_id or group_id. When it is deserialized, the task_dict of the DAG is looked up to put the corresponding task into the TaskGroup.

DAG example with TaskGroup

image

This is the code that builds the above DAG:

from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup


def create_section():
    """
    Create tasks in the outer section.
    """
    dummies = [DummyOperator(task_id=f'task-{i + 1}') for i in range(5)]

    with TaskGroup("inside_section_1") as inside_section_1:
        _ = [DummyOperator(task_id=f'task-{i + 1}',) for i in range(3)]

    with TaskGroup("inside_section_2") as inside_section_2:
        _ = [DummyOperator(task_id=f'task-{i + 1}',) for i in range(3)]

    dummies[-1] >> inside_section_1
    dummies[-2] >> inside_section_2


with DAG(dag_id="example_task_group", start_date=days_ago(2)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("section_1", tooltip="Tasks for Section 1") as section_1:
        create_section()

    some_other_task = DummyOperator(task_id="some-other-task")

    with TaskGroup("section_2", tooltip="Tasks for Section 2") as section_2:
        create_section()

    end = DummyOperator(task_id='end')

    start >> section_1 >> some_other_task >> section_2 >> end


What problem does it solve?

  • Offers an alternative to SubDagOperator
  • Makes it possible to organize DAGs with large number tasks into more mangeable groups.
  • Makes UI easier to use especially for DAGs with many tasks

Why is it needed?

  • DAG development, especially large dag, will become easier by organizing tasks into nested TaskGroups.
  • Usability of the web UI needs to be improved, especially for DAGs with many tasks that can be organized into groups.
  • SubDagOperator is difficult to work with. It also has some known downsides.
  • TaskGroup has much less special handling than SubDagOperator in the scheduler and hopefully better maintainability.

Are there any downsides to this change?

  • Needing to update the webserver UI to render TaskGroup in the UI
  • For a DAG with thousands of tasks and edges, rendering the graph may become slow.

Which users are affected by the change?

There are no immediate impact to existing users of SubDagOperator. They have a new option to switch to TaskGroup.

How are users affected by the change? (e.g. DB upgrade required?)

DB upgrade is not required.

Other considerations?

  • We may consider introducing lazy-loading if it proves useful for improving the UI performance of TaskGroup
  • Clear documentations on new concepts and behavior changes
  • Consider deprecating SubDagOperator in the future if TaskGroup becomes more widely adopted.

What defines this AIP as "done"?

  • Agree and implement TaskGroup.
  • Modify the webser UI to adapt to the new change