Status

StateCompleted
Discussion Threadhttps://lists.apache.org/thread/xhq5o3b3h879cwsc93xwq0836mwc5k6p
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Projecthttps://github.com/apache/airflow/projects/12
In Release2.3.0

Motivation

When writing DAGs in Airflow, users can create arbitrarily parallel tasks in dags at write-time, but not at run-time: users can create thousands of tasks with a single for loop, yet the number of tasks in a DAG can’t change at run time based on the state of the previous tasks. This inelasticity limits Airflow’s capability as a parallel data execution engine, and restricts the use-cases of how our users can write DAGs.

Scenarios

Processing files in S3

Let’s take a simple example of why a Dynamic DAG is crucial to complex data processing. In this example, we want to grab all files in an s3 bucket, get the number of words in each file, and find the file with the maximum number of words. This is essentially the “hello world” of MapReduce.

An example in "pseudo" python:

my_files = list_files_in_s3()
num_words_per_file = [] 
for f in my_files:
   num_words_per_file.append(get_num_words(f))
result = max(num_words_per_file)

To perform this processing directly on the worker under Airflow's current model, there are really only two viable options.

  • Create a static number of workers that each pull/process files and attempt to evenly shard files across those workers. This method is of course suboptimal as users will either need to create far too many workers or have less than complete parallelism.
  • Create a PythonOperator that launches a series of DAGs in a for-loop and then monitors them until they all complete.

Another third that we sometimes see is to have the list_files_in_s3() exist as top level code in the DAG which is something that is strongly discouraged (as it slows down parsing and happens at more than just execute time)

Hyper-Parameter Tuning a Model

A second example would be a user who wants to perform hyper-parameter tuning on a model and then publish the best model to their inference servers. When data scientists run experiments on an ML model, they will often want to test multiple parameter configurations and then find the best performing model to push to their model registry. These parameter lists can be quite flexible/multi-dimensional so predicting ahead of time how many models they want to produce can be very limiting to this workflow.


Proposal

Add `map` and `reduce` functionality to Airflow Operators

For a first-round Dynamic Task creation API, we propose that we start out with the map and reduce functions.

Map and Reduce are two cornerstones to any distributed or undistributed list comprehension paradigm. It’s used for Spark Dataframes, Pandas Dataframes, Scala Lists etc. as a way to handle a series of unique transformations, and then combine those results into a single object.

I propose that the added map and reduce functionality could look something like this:

from airflow.decorator import task

@task
def add_one(x: int):
  return x + 1

@task
def sum_values(x: List[int]) -> int:
  return sum(x)

add_x_values = [1, 2, 3]
added_values = add_one.expand(x=add_x_values)
result = sum_values(added_values)

This will result in a DAG with four tasks, three mapped "invocations" of add_one, each called with a single number, and a sum_values that is given the result of each add_one.

One thing to consider when mapping over a dictionary is if we want to allow just string-keys, or if we allow any JSON-encodable value as keys. (Python allows almost any type as a dictionary key for reference.) The downside to allowing more than just string keys is that you could map over `{"1": "a string", 1: "a number"}` and differentiating the two keys is hard in the UI.

Alternatives considered

Initially, we were hoping to offer something more natively pythonic (such as allowing users to generate tasks using a for-loop), but this method introduced many edge cases so we have favoured explicit over magic.

For example, a for-loop based approach (possibly implementable with a custom iterator/generator) might look like this

my_files = list_files_in_s3()
num_words_per_file = []
for i, f in enumerate(my_files):
  num_words_per_file.append(get_num_words(f,i))

While it might be possible to create this functionality, any Airflow user would expect this functionality from day one and would be very frustrated by operating with something that looks like a python for loop, but doesn’t actually act like one.

By implementing the map function, we can give a very clear line in the sand that this is an Airflow function. This could also act as a basis for an eventual for-loop syntactic sugar, but in the short term it would give us control of how users interact with the system.

Allow Partial Operators and Kwarg Functionality for Maps

When dynamically generating tasks for an operator, we have to assume that users will not want to map across every argument in that operator’s creation. To address this, we can allow users to create “template operators” and then provide keyword args for just the features they want to map across. All other arguments will be assumed as static across all instances.

from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator

@task
def get_files_from_s3():
    # S3 list operation
    ...

my_files = get_files_from_s3()
s3_delete_files = S3DeleteObjectsOperator.partial(
   aws_conn_id="my-aws-conn-id",
   bucket="my-bucket"
).map(key=my_files)
from airflow.operators.python import task

@task
def my_task(x, y):
   return x + y

@task
def sum_values(x: List[int]) -> int:
   return sum(x)

add_values = [1,2,3]
added_values = my_task.partial(x=5).map(y=add_values)
result = sum_values(added_values)

We need to use Operator.partial().map()  instead of Operator().map  as without the resulting mapped values it is unlikely we would be able to construct the Operator (i.e. it would likely result in a TypeError due to not enough arguments passed to the constructor).

This partial functionality will especially shine when used on traditional Airflow operators. Imagine wanting to create a series of K8sPodOperators where all features are the same except for the command. There might be a lot of other arguments that stay static across all tasks. 

k = KubernetesPodOperator.partial(
   namespace='default',
   image="ubuntu:16.04",
   arguments=["echo", "10"],
   labels={"foo": "bar"},
   secrets=[secret_file, secret_env, secret_all_keys],
   ports=[port],
   volumes=[volume],
   volume_mounts=[volume_mount],
   env_from=configmaps,
   name="airflow-test-pod",
   task_id="task",
   affinity=affinity,
   is_delete_operator_pod=True,
   hostnetwork=False,
   tolerations=tolerations,
   init_containers=[init_container],
   priority_class_name="medium",
).map(cmds=commands)

Map for TaskGroups Objects

When we offer the ability to map across a single task, we create a “breadth-first execution” model where there is a direct dependency between the tasks, which you can see below. In this Mode, we have a pipeline for which none of Task B’s mapped tasks can start until every single mapped task of A task has finished. This architecture can lead to significant bottlenecks if a user has multiple tasks that they would like mapped out before reducing.

On the other hand, Depth-first execution allows the user to create complex mapped pipelines with full assurance that the tasks will execute as quickly as dependencies and the scheduler allow. Adding this feature also adds a powerful new capability to the TaskGroup object, which was up until this point purely a UI construct.

Depth-first task execution is possible by allowing users to map across TaskGroup objects. We can add a functionality where users can add arbitrary keyword args to a TaskGroup object and then use those kwargs as templates for task group enactment.

For mapping across TaskGroup objects, we will need to prevent the TaskGroup from being evaluated too early. To allow the scheduler to parse a TaskGroup normally AND later evaluate it at runtime.

lines = ["foo", "bar","baz"]
@task_group
def my_task_group(my_arg_1: str):
   task_1 = DummyOperator(task_id="task_1")
   task_2 = BashOperator(task_id="task_2", bash_command=f"echo ${section_1.my_arg_1}")
   task_3 = DummyOperator(task_id="task_3")
   task_1 >> [task_2, task_3]

mapped_task_group = my_task_group.map(my_arg_1=lines)

Alternative Considered

One option considered as an alternative to using TaskGroups directly, was to create a new concept such as TaskBlocks which are effectively derived from TaskGroups but have the above additional functionality. 

The advantage of this approach is that it keeps TaskGroups simple as a UI-only concept and people who want to leverage additional functionality as described above would use the TaskBlock entity with the “.map” and “.reduce” operations. 

The disadvantage of this approach is that we are adding a new API concept to the DAG writing API, which could increase confusion. It also runs the risk of leaving TaskGroups underpowered/would potentially create confusion as people determine which to use.

Handling multiple mapped values

In cases where a user wants to map against multiple dynamically sized lists, we will perform a cartesian product of those two lists. While we might allow for zip operations in the future, this will not be in the initial release as there are reasonable workarounds to create a zip function.

from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator

@task
def get_files_from_s3(bucket):
    # S3 list operation
    ...

@task
def do_something_with_s3_files(file_1, file_2):
	...

my_files = get_files_from_s3("s3://bucket_1")
my_files_2 = get_files_from_s3("s3://bucket_2")
do_something_with_s3_files.map(file_1=my_files, file_2=my_files_2)

In this example, di_something_with_s3_files  will be called with file_1=my_files[0], file_2=my_files[0]  ... file_1=my_files[n], file_2=my_files[0], file_1=my_files[0], file_2=my_files[1]  ... file_1=my_files[n], file_2=my_files[1] , etc.

Changes to the UI

There are a few points to note when considering UI changes to show DAG that changes on a per-Dagrun basis

  1. This does not necessarily require DAG versioning. The DAG itself is not changing. As long as we tie DAG formation to TaskInstances instead of Tasks, we can keep basic DAG structure. 
  2. We will treat it as similar to TaskGroups. There is a single “parent” that gives an overall summary. We’ll visually indicate when a task instance is a mapped task group (ie: an icon or the task state has a different shape like dag run state has) that can be expanded.
    1. When zoomed in, users will have an option to click on any generated task (e.g. run_after_loop[0], run_after_loop[1] or run_after_loop[key] if the map function is iterating over a dictionary) to see the logs of each individual taskinstance.
    2. Expanding the task group will be paginated, and only best for seeing a few taskinstances. The Task Instance details will be improved to include a sortable table of all taskinstances and even a graph view summary in case the task instances are actually task groups themselves.
  3. The gantt chart will not show any difference as these generated tasks can be treated like normal task instances.
  4. Mockup of the graph view

    To prevent any major changes to the Graphview code, we can simply add a “mapped” icon to the task


    The "status" display for a mapped task (in both graph and tree views) will follow the same logic that we use for the status of a Task Group: even 1 failure and 100 successes will show as failure (tooltip on hover gives you a breakdown)
  1. The Task Instance Detail pages (and urls) will need to be changed to understand and show mapping indexes
  2. It should be possible to re-expand a mapped task, as well as just clear an individual mapped task instance.

Template context changes

We will make the current mapping available in templates.

For mapping over a list:

  • {{ mapping.key }} will be the list index (0...n)
  • {{ mapping.value }} will be the appropriate element in the list

For mapping over a dictionary:

  • {{ mapping.key }} will be the dictionary key (string, int, float etc.)
  • {{ mapping.value }} will be the dictionary value

Add configuration for maximum maps

We Airflow engineers always need to consider that as we build powerful features, we need to install safeguards to ensure that a miswritten DAG does not cause an outage to the cluster-at-large. To prevent a user from accidentally creating an infinite or combinatorial map list, we would offer a “maximum_map_size” config in the airflow.cfg. Airflow admins can choose whether to set this value or not, and we would allow users to manually override in the map function if they know that this map will be large.


Assumptions and Limitations

  1. Any values to be mapped will require something that is iterable. In the case of lists we will map over each item and in the case of dictionaries we will map over each key/value pair.
  2. Each individual item you iterate over should fit in the memory of a single worker
  3. The supplied list must come from either 
    1. XCOM from upstream tasks
    2. A static list from the DAG file that lives in the serialized representation
    3. Dagrun params (aka "dagrun conf" on "trigger args")
    4. Variables
  4. If the supplied item is a dictionary, all keys must be json encodable primitives.
  5. The list of items supplied must be finite (no infinite generators)


Implementation

This section lists some of the bigger implementation issues we have considered while planning this, but are not a complete list, nor a guarantee of how it will be implemented one development starts (if something proves unworkable for instance)

Implementing Map

When parsing a DAG that has a map function, we will treat the “.map” function as syntactic sugar that will create a “MappedOperator” instance that contains the operator class as well as the dictionary of kwargs and the mapped objects

Once a mapped Task is ready to run (all dependencies met), the scheduler will read that this is a mapped task, and will replace that one task instance row with the list of mapped task instances that need to run.


The Task/BaseOperator setting max_active_tis_per_dag  will apply to the mapped task across all dag runs. Is it worth adding an additional parameter to map()  that limits how many mapped tasks for a single DagRun can run in parallel?

Implementing Reduce

Once the mapped tasks are running, any downstream tasks will be able to check if any of its upstream tasks are mapped tasks. If a task contains a dependency on an upstream mapped task, the scheduler will launch a database query checking for all TaskInstances associated with the mapped task. This database query will only need to happen as tasks change state, so it should not put significant stress on the database.

Once these task instances have been generated, they are treated the same way any other task would be treated. They respect existing trigger rules, concurrency limits, etc..

Once all upstream mapped task instances are complete, the downstream task will gather all XCOM output for processing. An important point here is that we could be dealing with thousands of XCOMs, and a single list of all of these outputs could kill the memory of a worker. To prevent excess memory usage, we supply values to a reduce task in the form of a lazy list. (Lazy lists are iterators that supply one value at a time, but allow for normal list comprehensions.) This is especially important for users of custom backends that might have large outputs for each task. Since lazy lists can only iterate once, users that want to iterate multiple times in a reduce operation will need to wrap the lazy list in a list and pull the entire result into memory. This will be mentioned prominently in the documentation.

Changes to serialization

In order for the scheduler to be able to expand mapped tasks we will need to make some changes to the serialized DAG.

  • Mapped Tasks and TaskGroups will need to be stored 
  • Add a mapped property to serialized Tasks and TaskGroups
    • Dictionary
    • mapSource
      • Key: parameter
      • Value: typed field describing what to map over
  • Add a partial(?)
    • We probably won’t need to add a partial to the serialized format, but we will need to modify the serializer slightly to account for partials.


Although unlikely, this might require v2 of the serialization format. We will determine that when we come to implement the changes. If it does we will handle it in such a way that the deserializer still understands v1 and v2 of the format, and the format will only be upgraded by the usual dag parsing process, not as part of a migration.

Database Changes

Taskinstance Table

In order to store mapped task instances we need to make changes to Task Instance table:

  • Add an integer typed mapping_id column:

    Since MySQL has a limit to the size of keys having a rich JSON valued mapping_id would not work on MySQL. To address this we will treat both dictionaries and lists as if they were indexed lists (dictionaries can be stored as a list of key/value pairs). The default value will be -1 to signify that there is no mapping index (since 0 signifies the first element in the list)

  • mapping_id will need to be added to the primary key - it will now be (dag_id, task_id, run_id, mapping_id)

Any tables "associated" with TaskInstance (TaskFail, TaskReschedule, RenderedTIFIelds etc) will need similar changes.

In testing this proposed change on a database running on a local laptop with 35 million task instance rows it took 1m07s to add the column and change the primary key (for comparison the 2.2.0 migration took 28 minutes, and the sped up migration in 2.2.1 took 9 minutes) so this change is achievable without lengthy downtime, even for large installs. (The major cause of the slowdown for the migration in  2.2.0 was removing a column required each row to be re-written, but just adding a column with a default value is much quicker).

Thanks to Leon and Rob of Bidnamic for testing this proposed migration on a clone of their production database, where it took 38mins (compared to multiple hours for the 2.2.0 migration), which they said was acceptable.

Handling mapping across multiple values

There will be situations where a task receives multiple inputs and wants to map across both values. In this case, we will run a Cartesian cross-product across these two lists. 

Implementation-wise, we need to work around the constraint that MySQL limits key sizes, so we can only store an integer for map index.  To work within this constraint, we can take advantage of knowing the order of lists in the serialized DAG object (guaranteed since Python 3.6). We will essentially go through the entirety of [0][X], and then move to [1][X],[2][X], etc. 

So in the case where we have two lists, one of size 5, and one of size 3, the order would look something like this:

0 = 0,0

1 = 0,1

5 = 1,0

6 = 1,1

11 = 2,1

And so on. This same method will also work for more than two values being iterated.

The order in which parameter is looped over first is by design not specified and will be documented as such for DAG authors, but it will be stable thanks to Python 3.6 dicts keeping insertion order.

TaskMapping table

When it comes to mapping tasks to individual workers, we have to consider that the actual key/value pair in the xcom table might be quite large. If a user is using a custom xcom backend, there might be a map where each value is a dataframe or very large string. With this in mind we need a way to ONLY pull the keys into the scheduler to minimize slowdown when generating tasks. To address this, we will create a TaskMapping table, which will story a primary key (similar to that of the XCom table) and the following columns:

  • type: string/"enum" determine whether the type is a list or a dict
  • length: integer that can be used if the value is a list, or the number of keys.
  • keys: JSON column containing array of strings to signify the keys if the value is a dictionary

Rows to this table will be created inside BaseOperator when pushing a value to XCom: it will examine the downstream tasks and if any of them are marked as mapped tasks then we will populate this row whilst we already have the data in memory to populate it. There will be a single row per upstream task instance of a mapped task that pushes anything to XCom.

Task Runner Changes

We will also need to add some logic on the workers to know how to handle a mapped task.

Rather than overloading the task_id argument to `airflow tasks run` (i.e. having a task_id of `run_after_loop[0]`) we will add a new `--mapping-id` argument to `airflow tasks run` -- this value will be a JSON-encoded. an integer specifying the index/position of the mapping.

Task Logging changes

Each mapped task instance will need to write logs to its own file, so this will need to be understood by the UI and built in logging handlers (ES, etc.) when requesting logs too.


Changes to original AIP during discovered during implementation

When writing the code for this AIP we discovered some significant drawbacks/gotchas to the proposed dag-authoring AIP as it related to non-TaskFlow operators, and the ambiguity of mapping over lists. For example take this

t3 = DockerOperator.partial(
    image='centos:latest',
    task_id='docker_op_tester',
).map(command=["/bin/sleep", xcom_arg])

It's not clear (to code – a human can take a reasonable guess at the intent) if this is meant to result in two mapped operators, on e with "/bin/sleep" and another with xcom_arg , or if each mapped operator should be ["/bin/sleep", n]  (where n is the each element of the xcom_arg.)

To resolve this ambiguity, but to also keep the ability to catch errors/typos as soon as possible (ideally at DAG parse time, not at run time!) we have made the following changes.

We are also addressing the comment that "we are implementing map 'backwards'" – in python it is map(list, function) (and other languages it is list.map(function)) but we had it as operator.map(list).

Rename .map()  to .expand() , taking only keyword-only arguments.

Each (keyword-only) parameter to `apply` must be an iterable after being resolved (if containing XComArg) at run-time, and each element in the resolved iterable is applied to the partial operator to achieve operator mapping.

Examples: (all create two mapped tasks executing ls ~ and cat /etc/passwd )

DockerOperator.partial(
    task_id="run_command_in_container",
    ...,
).expand(
    command=[
        ["ls", "~"],
        ["cat", "/etc/passwd"],
    ],
)
@task
def commands():
    return [
        ["ls", "~"],
        ["cat", "/etc/passwd"],
    ]

DockerOperator.partial(
    task_id="run_command_in_container",
    ...,
).expand(
    command=commands(),
)
@task
def home():
    return "~"

@task
def passwd():
    return "/etc/passwd"

DockerOperator.partial(
    task_id="run_command_in_container",
    ...,
).expand(
    command=[
        ["ls", home()],
        ["cat", passwd()],
    ],
)

Add .map() method on to XComArg

Implement a map()  method on XComArg that can be used to "transform" the iterable returned by the task function. This method takes one single argument that must be an @task  function accepting one argument. At runtime, each element in the iterable returned by the upstream XComArg task will be passed to the task passed as the `map()` argument.

@task
def directories():
    return ["~", "/etc"]

def create_ls_command(directory):
    return ["ls", directory]

# This creates two tasks executing "ls ~" and "ls /etc".
DockerOperator.partial(
    task_id="run_command_in_container",
    ...,
).expand(
    command=directories().map(create_ls_command),
)

The map function is only ever evaluated in the task runner so it can be as complex as desired.


5 Comments

  1. added_values = add_one.map(x=add_x_values)

    i suggest calling this apply  instead of map 

    i think more conventionally a map  would be understood to be a method on a set-like thing, whereas apply is a method on a function-like thing.

    and if you think about what the word map i think this makes sense. because you're essentially creating a mapping from one set to another, and each element in one set "maps" to a corresponding element in the target set.   

    meanwhile, the function can be "applied" to a set or iterable.  in this case the 'add_one' task is analogous to a function, not a set.

    in this case it seems backwards.
    1. Hmmm, I see what you mean (and do agree with you a bit), but I think having "map" in the name is important to draw parallels with Python's built in map  and similar features in other languages.

      1. Yeah. Agree with Ash Berlin-Taylor  (and understad you Daniel Standish - even if it is backwards, it's rather intuitive what the function does. Even for those who are used to classic Map/Reduce. I'd say apply  is so ambiguous, that map  is defintely better. Unless we find a better name, I'd say map  is good.

        One  alternative that I could maybe accept is fanout or fan_out   - because this is pretty much what the function does. But even with that one I find map  a bit better.  

        1. So it's not just classic map/reduce e.g. if you look at the table with comparisons here it's always iterable.map(func) not func.map(iterable) so to me the risk is that it's more confusing to users and continuing cognitive disssonance as you have to remember in airflow it's backwards.

          E.g. the present design is like this

          task_that_does_something_with_an_s3_key.map(task_that_returns_s3_keys) 

          And if we analogize here, task_that_returns_s3_keys is the iterable.

          We could  implement map on the iterable but I concede it does seem likely that the interface would be modestly more kludgy.  Which is why I like the idea of keeping the proposed interface but finding a different word.

          Another one could be as_mapper  because i think sometimes mapper  is used to refer to a function that maps.  But I like apply better because it's cleaner and simpler.  And yes you could say apply is ambiguous but in a sense this is beneficial because in particular it does not carry with it the association with a particular convention (like map does) – in that way it's open for appropriation.  And so we can take on the meaning that when you apply a task to another task, it maps the result of that task onto more tasks.