The JIRA issue is optional, but if there is already one that is relevant link to it here. Using the Jira issue macro is preferred.
|
Airflow does not currently have an explicit way to declare messages passed between tasks in a DAG. XCom are available but are hidden in execution functions inside the operator. This can create confusion when working on ETL pipelines that need to send messages between tasks. This includes PythonOperator, which could be used for simple message processing but is not as it's difficult to accept messages from previous tasks as arguments and it needs to be pulled explicitly creating a lot of bootstrapping code.
This AIP focuses on proposing a Functional API that would allow to explicitly declare message passing while implicitly declaring task dependencies. At the same time, it introduces a new way to easily declare PythonOperators using a simple decorator.
What change do you propose to make?
The proposal end result is to provide an explicit way to declare message passing dependencies in a DAG. That can be accomplished by extending the Operator API to have a __call__ method that can be used after initialization of the operator to declare run-time attributes needed on execution time. To specify this message dependency, we will add a new class (XComArg) that is a lazy reference to an XCom value from an existing operator. This class will be resolved and parsed into the class attribute on pre_execute/execute time.
In addition, in order to make it easier to use custom functions in DAGs, we can add a decorator to make it simpler to declare PythonOperators. This paired with the __call__ method declared above will allow for a more functional declaration of DAGs that rely heavily on messages passing.
All the changes proposed are non-breaking, which would allow to be merged in 1.10 and allow a slow-paced adoption.
Re-use the already existing template_fields attribute to resolve XComArg if found. Add clause in the render_template method that resolves if isinstance(XComArg).
Add an easy way to generate XComArgs for existing operators: add new property output pointing to ‘return_value’ key by default
Adding dependencies between operators that generate and use an XComArg if found in template_fields.
Enable easy transformation of python functions into PythonFunctionalOperators by making a decorator that takes a function and converts it into a PythonFunctionalOperator.
Should also allow to pass args/kwargs to the PythonOperator from the decorator args/kwargs. Ex: Set task_id for the operator task(task_id='random').
Add multiple_outputs flag that unrolls a dictionary into several keyed XCom values.
2 ways to use it:
@dag.task: Uses dag object, does not need the DAG context, task automatically assigned to DAG.
@airflow.decorators.task: Makes function an operator, but does not automatically assign it to a DAG (unless declared inside a DAG context)
Make it easier to set op_arg and op_kwargs from __call__, effectively enabling function like operations based on XCom values.
Should allow multiple calls in the same DAG or a different DAG, generating new task ids programmatically if needed.
See: https://github.com/casassg/corrent/tree/master/dags
import json get_ip = SimpleHttpOperator( task_id='get_ip', endpoint='get', method='GET', xcom_push=True ) @dag.task(multiple_outputs=True) def prepare_email(raw_json: str) -> Dict[str, str]: external_ip = json.loads(raw_json)['origin'] return { 'subject':f'Server connected from {external_ip}', 'body': f'Seems like today your server executing Airflow is connected from the external IP {external_ip}<br>' } email_info = prepare_email(get_ip.output) send_email = EmailOperator( task_id='send_email', to='example@example.com', subject=email_info['subject'], html_content=email_info['body'] ) |
Most users will be able to explicitly declare message passing on their DAGs using a Python functional approach. No existing DAGs will need to change.
It's an incremental and optional update. Users will not be affected by the change.
This proposal comes from inspecting APIs in new DSL frameworks like Dagster, Prefect, Metaflow and Databand.