Status

StateCompleted
Discussion Thread
GitHub
Created

2020-02-21

In Release2.0.0

Motivation

Airflow does not currently have an explicit way to declare messages passed between tasks in a DAG. XCom are available but are hidden in execution functions inside the operator. This can create confusion when working on ETL pipelines that need to send messages between tasks. This includes PythonOperator, which could be used for simple message processing but is not as it's difficult to accept messages from previous tasks as arguments and it needs to be pulled explicitly creating a lot of bootstrapping code. 

This AIP focuses on proposing a Functional API that would allow to explicitly declare message passing while implicitly declaring task dependencies. At the same time, it introduces a new way to easily declare PythonOperators using a simple decorator. 

Considerations

What change do you propose to make?

The proposal end result is to provide an explicit way to declare message passing dependencies in a DAG. That can be accomplished by extending the Operator API to have a __call__ method that can be used after initialization of the operator to declare run-time attributes needed on execution time. To specify this message dependency, we will add a new class (XComArg) that is a lazy reference to an XCom value from an existing operator. This class will be resolved and parsed into the class attribute on pre_execute/execute time.

In addition, in order to make it easier to use custom functions in DAGs, we can add a decorator to make it simpler to declare PythonOperators. This paired with the __call__ method declared above will allow for a more functional declaration of DAGs that rely heavily on messages passing. 

All the changes proposed are non-breaking, which would allow to be merged in 1.10 and allow a slow-paced adoption.

XComArg

  • Explicit way to pass around results between operators. 
  • This object is a reference to an XCom value that has not been created and will need to be resolved in the future.
  • It can generate new XComArgs that point to the same operator but has a different key by using the key accessor. E.g: output['train']
  • To do so, it includes as class attributes both the originating operator instance and a key (both needed to pull the XCom)
  • Has a property operator that allows access to the origin operator.

Add .output and resolve XComArg templated fields in BaseOperator

  • Re-use the already existing template_fields attribute to resolve XComArg if found. Add clause in the render_template method that resolves if isinstance(XComArg).

  • Add an easy way to generate XComArgs for existing operators: add new property output pointing to ‘return_value’ key by default 

  • Adding dependencies between operators that generate and use an XComArg if found in template_fields

Python function decorator

  • Enable easy transformation of python functions into PythonFunctionalOperators by making a decorator that takes a function and converts it into a PythonFunctionalOperator. 

  • Should also allow to pass args/kwargs to the PythonOperator from the decorator args/kwargs. Ex: Set task_id for the operator task(task_id='random').

  • Add multiple_outputs flag that unrolls a dictionary into several keyed XCom values. 

  • 2 ways to use it:

    • @dag.task: Uses dag object, does not need the DAG context, task automatically assigned to DAG.

    • @airflow.decorators.task: Makes function an operator, but does not automatically assign it to a DAG (unless declared inside a  DAG context)

  • Make it easier to set op_arg and op_kwargs from __call__, effectively enabling function like operations based on XCom values. 

  • Should allow multiple calls in the same DAG or a different DAG, generating new task ids programmatically if needed.

  • Add get_current_context to get current Airflow context in a decorated function.

DAG example

See: https://github.com/casassg/corrent/tree/master/dags 
import json
from datetime import datetime, timedelta

from airflow.operators.email_operator import EmailOperator
from airflow.operators.http_operator import SimpleHttpOperator

from airflow import DAG, task

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 12, 27),
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}


with DAG(
    'send_server_ip', default_args=default_args, schedule_interval=None
) as dag:

  # Using default connection as it's set to httpbin.org by default
  get_ip = SimpleHttpOperator(
    task_id='get_ip', endpoint='get', method='GET', xcom_push=True
  )

  @dag.task(multiple_outputs=True)
  def prepare_email(raw_json: str) -> Dict[str, str]:
    external_ip = json.loads(raw_json)['origin']
    return {
      'subject':f'Server connected from {external_ip}',
      'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>'
    }

  email_info = prepare_email(get_ip.output)

  send_email = EmailOperator(
      task_id='send_email',
      to='example@example.com',
      subject=email_info['subject'],
      html_content=email_info['body']
  )


What problem does it solve?

  • Explicit message passing in Airflow DAGs

Why is it needed?

  • Extends Airflow DAG API to allow explicit message passing between tasks
  • Improves developer experience for simple usages of PythonOperator
  • Incorporates a new abstraction for message passing preparing Airflow to better support on data pipelines use cases

Are there any downsides to this change?

  • Agreeing on the right abstraction is key, at the same time keeping in mind future features that could be added to Airflow to build on this API. Difficulties will mostly be in agreeing on a single implementation for this proposal.
  • Implementation-wise, it may require some extension of the webserver to load the resolved XCom specified as a dependency for observability. 
  • Potentially, this could lead to more users/customers using XCom to pass around large pieces of data 
  • XCom uses pickling for serialization/deserialization, which may be unstable for certain data passed around. 

Which users are affected by the change?

Most users will be able to explicitly declare message passing on their DAGs using a Python functional approach. No existing DAGs will need to change.

How are users affected by the change? (e.g. DB upgrade required?)

It's an incremental and optional update. Users will not be affected by the change.

Other considerations?

This proposal comes from inspecting APIs in new DSL frameworks like Dagster, Prefect, Metaflow and Databand.

What defines this AIP as "done"?

  • AIP has been voted to move forward with an agreement on the right API abstraction.
  • Implemented AIP-31 GitHub issues: https://github.com/apache/airflow/labels/AIP-31
  • - Stretch - Webserver loads resolved XComArgs into the UI when inspecting a Task instance (both inlets and outlets).

12 Comments

  1. Gerard Casas Saez I think this looks fantastic!

    We should consider the option to add some more options to the decorator. Things like being able to point the operation at a celery queue, executor_config, etc.

    I'm currently working on some proposals on how to make a better airflow + Jupyter story and I think this would tie in really nicely (smile).

    1. Yes, I agree. The decorator accepts any argument that the Operator accepts on initialization, so as long as the proposed PythonFunctionalOperator accepts an option, then the decorator has it as well.


      Would love to hear/read more on the Jupyter integration if you have any pointers on that, we are currently working on enabling some more experimental features for Airflow internal users, so it may be helpful to see if your proposal solves some of the problems we are also encountering.

  2. I really like the approach! I am wondering if we can do:


      send_email = EmailOperator(
          task_id='send_email',
          to="example@example.com",
          subject=email_subject_generator(ip_info),
          html_content=email_body_generator(ip_info)
      )


    This will probably require some mechanism similar to jinja templates. In this way, writing such dags will be even easier.

    1. I guess it should be easy. I mean its a matter of adding an if here with XComArg instead, right? Or should we separate it from templated fields better? (`xcom_fields` argument that gets parsed separately)

      1. Yes, I think it should be rather easy to do

  3. Gerard Casas Saez one more question: can we chain functional ops and those non functional? For example:


    email_body_generator >> BashOperator(task_id="echo", ...)


    Probably this will raise an error unless we add if branch in chain methods of BaseOperator. WDYT?

    1. Tecnically this should be allowed as of now. The decorator effectively replaces the function with a FunctionalPythonOperator (basically PythonOperator with some extra code). So you can chain it as any other operator.

  4. Tomasz Urbaszek

    What is the current state of this AIP? If you are planning to do any other work, could I ask you to create **one main issue**, which will allow us to track the progress of work. When we're done, we can close the ticket. All other AIPs have a ticket like this, and I think we should stick to it.

  5. Should we mark this AIP as completed? Does the current state allow it to be released in Airflow 2.0? I try to reach people who have an open AIP in order to establish the status, which will allow us to better plan our work on the new release.

  6. I think yes, the idea of functional operators have been introduced and can be used by users. WDYT Gerard Casas Saez

  7. Agree w Tomasz UrbaszekI will mark as completed. Any additions can be done as followup work and released to build on top of it.