Status

StateVote
Discussion Threadhttps://lists.apache.org/thread/yjcgb6fhn365n3307blq4y4v50gjynsy
Vote Threadhttps://lists.apache.org/thread/s5g0zvjjxlpgnp718vs6n86qtlgzcgpw
Vote Result Thread
Progress Tracking (PR/GitHub Project/Issue Label)
Date Created

2024-07-19

Version Released
AuthorsTzu-ping Chung 

Abstract

A new way to provide templates to an Airflow operator is proposed to replace the existing implicit template detection based on templates_ext and template_fields. A base class Template is implemented to provide field-templating features, with two concrete subclasses FileTemplate and StringTemplate being planned for the initial release.

Motivation

Field templating in Airflow refers to how certain fields in Airflow operator classes can be passed in a string using the Jinja template syntax, and they are rendered at runtime to actual values used for the operator’s execution.

This is a nice feature, but the implicitness introduces a few inherent issues. Since the template detection and rendering happens automatically, it is not easy to tell what fields are template-able and what are not; while each operator does always have a template_fields that canonically indicates this, the knowledge is generally “tribal” and not very discoverable. It is also not easy to debug things when the behaviour is not expected, especially when templates_ext is involved—the TemplateNotFound issue is a common pitfall that troubles newcomers to no end.

This is probably the single most awkward pattern in Airflow 2:

# XXX: Airflow 2 syntax.
cat_file = BashOperator(
    task_id="cat_file",
    bash_command="cat {{ ds }}.sh",  # TemplateNotFound: cat {{ ds }}.sh
)

The string having a “.sh” suffix triggers templates_ext logic, making Airflow interpret the string as a path (that points to a template) and try to load it. The traditional workaround is to add a trailing space to the command:

# XXX: Airflow 2 syntax.
cat_file = BashOperator(
    task_id="cat_file",
    bash_command="cat {{ ds }}.sh ",  # Can you spot the difference?
)

This is so problematic we have a paragraph in documentation dedicated to it. Airflow 2.8 added the literal helper to improve ergonomics on this a bit, but you still need to first learn about (or more likely, see your task fail with the weird error) to remember to import.

I want to make templating explicit in Airflow 3, trading some ease of writing for better DAG legibility for less experienced Airflow users, improved debuggability when things go wrong, and providing better discoverability of whether a field is template-able.

Proposal

Instead of relying on template_fields and templates_ext to control templating behaviour, all operator fields in Airflow 3 will be non-templated by default. For a field to be template-able, it must have an argument in the operator’s __init__ matching the field name; the argument must explicitly accept a Template instance. For example:

class BashOperator(BaseOperator):
    def __init__(
        task_id: str,  # Not templatable.
        bash_command: str | Template,  # Templatable!
        env: dict[str, str] | Template | None = None,  # Templatable!
        append_env: bool = False,  # Not templatable.
        ...,
    ) -> None:
        super().__init__(task_id=task_id, ...)
        self.bash_command = bash_command
        self.env = env
        ...

It may be possible to simplify this with either dataclasses or attrs. I’m not sure if it is possible, but we will look into this during implementation. We’ll use one of the libraries if so.

The template_fields attribute will be made private and renamed to _template_fields. This is due to technical limitations in getting type annotations in old Python versions—the goal is to eventually remove it and rely on inspect.get_annotations to inspect what fields support templating instead. A pre-commit hook will be created to ensure the information in _template_fields is in sync in the meantime.

The templates_ext attribute is no longer needed.

Both template_fields and templates_ext will be kept available for compatibility considerations, but either declaring them in a subclass or accessing them will cause a deprecation warning. See Compatibility and Migration below for migration suggestions.

Template sources

Two subclasses of Template will be introduced to Airflow 3.0:

  • StringTemplate takes a string argument. The string should use the Jinja2 template syntax, and is rendered at runtime.
  • FileTemplate takes an argument that points to a file. The file’s content is loaded as the template string and rendered at runtime.

The Template base class is considered a public API, and users are free to subclass from it. Custom subclasses are resolved like built-in ones, assuming the implementation is correct. Documentation on correctly implementing subclasses will be provided. Later Airflow versions may also introduce more subclasses.

With explicit template declaration, the pattern mentioned in the Motivation section becomes:

cat_file = BashOperator(
    task_id="cat_file",
    bash_command=StringTemplate("cat {{ ds }}.sh"),
)

Airflow knows exactly this is supposed to be a template string, not a path to the template (for which you’d use FileTemplate instead), and does the correct thing.

This proposal does not aim to change how the templates are resolved and rendered at runtime. If a path passed to FileTemplate is relative, for example, resolution is still controlled by the DAG’s template_searchpath. Macros and filters also stay unchanged.

Rendering a non-string value

A new argument converter will be added to Template. If supplied, this should be a callable that takes exactly one argument—the rendered result—and converts it to a new value. The general use case is to render a field to a non-string type, such as:

StringTemplate("{{ ds }}", converter=datetime.date.fromisoformat)

The above is rendered into the date specified by ds in the Python date type.

Another interesting use case is to use ast.literal_eval as the converter to create a list, dict, or other richer Python structures. This should be a nice alternative to the somewhat clunky and restrictive render_template_as_native_obj. We can accept any callable here, including user-defined ones, without going through the plugin architecture, because the code will only be called in the worker.

Per-field arguments

Requiring all templating values to be wrapped in a class also introduces the possibility of per-field configurations. For example, you will be able to do

print_foo = BashOperator(
    task_id="print_foo",
   bash_command=StringTemplate(r"printf '$foo\n$bar'"),
    env=FileTemplate(
       "cat_file_env.json",
        template_searchpath=["/etc/my_templates/"],
        converter=json.loads,
    ),
)

Then, you can load the template file at /etc/my_templates/cat_file_env.json with content

{"foo": "123", "bar": "456"}

Nesting

Another interesting way to use the classes is to have nesting templates, which was previously impossible:

run_some_file = BashOperator(
    task_id="run_some_file",
    bash_command=FileTemplate(StringTemplate("{{ ds }}.sh")),
)

This loads a file at runtime corresponding to ds, and renders the content as a template. The example here is artificially contrived, but a similar pattern has been raised sporadically as a feature request. It is possible to work around the lack of this feature, but the syntax allows users more expressiveness when writing DAGs.

Compatibility and Migration

Airflow 3.0 is a major release, and it is technically acceptable to make this a breaking change and require all user DAGs to be rewritten to accomodate this change. However, the Airflow community feels strongly that it is essential to keep compatibility in this case. Various measures are therefore put into place to enable more users a smoother migration.

Most significantly, we are going to keep the Alrfow 2 syntax working in Airflow 3. This requires compatibility code to be added in template-rendering functions (e.g. render_template) so we can correctly handle nested templates (template strings in containers e.g. dict or list). This compatibility logic will be behind a configuration flag and disabled by default. This ensures new Airflow installations are on the new syntax when possible, but also minimise work for existing users to migrate an Airflow 2 installation.

A migration tool will also be provided to automatically rewrite DAG, in the lines of ruff and other linter-fixer tools.

One other significant approach I would like to take is to have a compatibility layer on core Airflow to handle migration. This takes inspiration from Python’s six package, providing an interface for users to write code that’s both compatible to both old and modern syntaxes, and get the same behaviour at runtime. This could be fulfilled by either airflow.providers.compat, or a separate package entirely—the latter option would allow us to easier evaluate when to stop developing the compatibility layer, potentially reducing maintenance overhead caused by mixing different compatibility concerns (toward other providers, and toward end users) in one provider. We can decide later either way, as late as Airflow 3’s release.

Make an Airflow 2 operator compatible to 3

We should also provide a utility to simply operators’ migration, used like this:

from airflow.operators.base import BaseOperator
from airflow.templates import StringTemplate
from airflow_compat import new_template_interface

@new_template_interface
class MyLongHistoryOperator(BaseOperator):
# Operaqtor implemented to use the old style.
template_fields = ...
templates_ext = ...
my_task = MyUnpopularOperator(
   task_id="my_task",
 value=StringTemplate("{{ ti.run_id }}"), # But now takes the new syntax!
)

When running on Airflow 2, the decorator would automatically provide a new __init__ method for the oprator that goes through template_fields and automatically convert template objects back to Airflow 2-style template strings when the DAG is parsed. Airflow 2-style implicit template strings continue to work on Airflow 2 when this decorator is added.

The template classes will be back-ported into the compatibility layer, so users migrate to the new syntax without needing to upgrade first. Per-field arguments are rejected both on Airflow 2 and 3 by the decorated operator since it is impossible to make them work on an operator designed to work the old syntax.

Since this is a decorator, users may also manually “fix” a less-maintained operator in many cases:

from third_party_package.operators import MyLongHistoryOperator as Airflow2MyLongHistoryOperator

MyLongHistoryOperator = new_template_interface(Airflow2MyLongHistoryOperator)

This is intended as a last-ditch solution when a library is abandoned. We’ll also add checks in the decorator to make repeated decoration result in only one conversion, so the user does not need to remove the manual fix even if the library ended up applying the decorator in the future.

Decorating an Airflow 3-style operator is a no-op.

Make an Airflow 3 operator compatible to 2

For an operator compatible to both Airflow 2 and 3 syntaxes, it should inherit BaseOperator from the compatibility layer instead of airflow—this keeps the operator compatible with Airflow 2, using the Airflow 3 syntax. The BaseOperator in the compatibility layer adds additional logic during resolution to correctly handle the new template classes. This way, we can avoid carrying compatibility code in Airflow 3, but still allow the user to slowly migrate to the new syntax after upgrading first (to receive other benefits).

I have not looked into if we could use a decorator for this instead of changing the base class. It might be possible, and likely preferrable if so. We’ll see during implementation.

Other migrating options

Instead of migrating operators one by one, a possibility is to have a helper function for the user to call in their cluster policies, either in dag_policy or task_policy, that dynamically replaces values in templated fields on the operator with a Template subclass. This may be the least intrusive solution—you virtually don’t need to rewrite any DAGs at all, but only add a function call in the policies—but personally I’m not sure I like the “magic” aspect of this, or want to recommend it as an official migration plan.

The literal helper will stay in core Airflow 3. It will simply does nothing when used in operators implemented in the Airflow 3 style, although the compatibility layer should still use it the same way as in Airflow 2.

Scope

This AIP should be considered done when the proposed new types are implemented, a compatibility layer is introduced, and operators in core Airflow and official providers are migrated to use the compatibility layer so they work in both Airflow 2 (in both the old and new operator templating syntaxes) and Airflow 3 (in the new operator templating syntax).

Rejected Ideas

Using subclasses to specify the rendered result type

The possibilities of using subclasses to do this has been considered, e.g. having an IsoFormatTemplate instead of passing in a separate converter. This was abandoned because we already use subclasses to distinguish between loading from a file (FileTemplate) or a literal string (StringTemplate). Using subclasses means we’ll need two subclasses for every conversion, one for loading from file and one for string, which is more confusing and harder to maintain. As an alternative, we can provide aliases functions for users to import, such as:

from airflow.templates.converters import isodate

StringTemplate("{{ ds }}", converter=isodate)

Where isodate is simply an alias to datetime.date.fromisoformat. Custom converters can also be put in the same module.

8 Comments

  1. One part that I commented out is the approach we will take for providers - the providers we release will have to work for both Airflow 2 and Airflow 3 for some time at least so we need to decide what to do:

    • we could remove templated_* from operators and add some auto-fixing in compat to bring them back to work on Airflow 2 same way they do today
    • we could keep templated_ in providers but this will keep the code cluttered with those
    • ...?

    I wonder what's your thinking here.

    1. Would it be a good idea for providers to depend on airflow_compat? If so, we could add a layer there for providers in that package so providers can start using the new syntax but still have the old behaviour when used against Airflow 2. When a provider is ready to drop Airflow 2, it would simply change the import to airflow proper without any other modifications needed. (Basically the approach we had with six for Python 2 to 3.)

  2. Yes. We can have all the providers depends on commmon.compat. That would work likely. Maybe we can even keep the auto-generated list of templated_fields and ext in the common.compat provider and monkeypatch them if airflow2 rather than keep them in the providers. Also pre-commit could be added to forbid adding new template fields in providers (instead you will have to add them to compat if you want to add them for Airflow 2). That would be not very friendly but also we could - potentially add compat provider to add `StringTemplate` and others to work also in Airlfow 2. Which might be even better than building it in Airflow 2.11 (but that part could be done in both).

    I think common.compat could be used for a number of such transitional behaviours without making Airflow3 code more complex.

    1. I think keeping the template definitions in providers is probably better in more than one way; otherwise I agree with you.

  3. Doing a second-pass review on the proposal I still very much like it. Looking forward for the vote.

    As Jarek also highlight I have two main concerns which should have a plan for - as this is a breaking change which would hit 95% of users and DAGs in the wild... we don't want to force all users to re-write all DAGs for Airflow 3 and this would inhibit migration in the future.. you know busines users are sometimes lazy, experts have left, all are afraid to touch code running... as we also see Airflow 1.10 still out there.

    1. I also assume Provider packages are widely used. They will in the future need to be deploy-able to both Airflow 2 and 3 with maintaining a single codebase. For Airflow 2 meand using `template_fields`+ext and the new syntax for Airflow 3. Can you elaborate a bit more w/o breaking changes and need to adjust 1000 existng Operators in the wild how to migrate/extend provider packages for Airflow 3? I would not like to lose a lot provider features just because it is a big efforts for provider maintainers.
    2. If you have the opinion that a global "compatability switch" is not desireable, how can we introduce the new Templating in Airflow 2-line such that a migration can start transitional before the move to Airflow 3? Can this be ensured? Otherwise I would feel the cluster policies being the only option if you don't want to support Jinja2 strings in Airflow 3 anymore. I mean there are thousands of DAG authors who need to transition DAGs...
      Technically we could just "auto-box" all string values from `template_fields` into `StringTemplate()`?
    1. So there are two scenarios we need to consider. First is the providers should run on both Airflow 2 and 3 with one single code base. This is not that difficult aside from making all providers depend on the compat layer instead of directly on core Airflow. We should be able to create a CompatBaseOperator instead for all provider operators to subclass from, that have the old behaviour when run on Airflow 2, and new behaviour on Airflow 3.

      The other scenario, using the old syntax on Airflow 2, is more complicated. The way I want this to be done is to introduce compat operator classes in the compat layer, and ask users to import those instead of directly from airflow.operators. Since the compat layer also works on Airflow 2, users can also change the imports first (running in Airflow 2), and then upgrade to Airflow 3 without changing DAGs.

      Even changing imports may be too much to ask for some people. IMO if that’s the case, they should not upgrade to Airflow 3. Operator templating is arguably one of the least problematic thing that will break for them—a lot of other things will break, including those deprecated features. If changing imports is too much to ask, I would argue those users simply lack the ability to modify any Python code to embark on a migration to Airflow 3. Trying to maintain compatibility too hard would defeat the purpose of having a major release in the first place.

      1. I think with any breaking changes to DAGs, the key here is to make sure people can migrate incrementally easily and track progress of that migration. It could be done in both "Airflow 2.11" (transitional one) and "Airflow 3.*" - but I think an important aspect is that we cannot ask some big users of Airflow to migrate all DAGs at once in a "big-bang" event. Nor we cannot ask them to have half of their DAGs run in Airflow 2 and half in Airflow 3 for longer than few days or weeks of transition. We need to give tooling to the users and give a lot of thoughts on the migration process they will run and how much disruptive it will be to them. It's not the question of ability, it's the question of risk those users have to take at the migration process. Many of Airflow DAGs are buisness-critical and the risk of "big-bang" change is one that almost no-one will be willing to change, - especially if they have several distributed team that write the DAGS and single "operations" people that do the migration. This is way more complex than "run this script on all your DAGs and you are done". We ned to design the migration process and make sure any DAG breaking change will be able to follow it.

        So I think migration involving DAG syntax changes could look like this:

        a) via Airflow 2.11 - which might take weeks or months for people to complete - depending how many DAGs they will have.

        • flag the Airflow 2 - only  syntax in some ways - so that it is visible (but not disruptive - this shoudl be a "normal production setup".
        • Allow to use new syntax in Airflow 2.11 and swich one-by-one
        • Psssibly - switch to isolation mode - likely also "per DAG" (AIP-44 might be actually a useful migration tool here) to see if the DAGs are not accessing Airflow metadata DB directly
        • allow to track progress: how much of DAGS still need to be migrated - including reporting all the DAGS that are not compatible with Airflow3 and clean instructions or automated tooling allowing that to change
        • those tools/reporting etc. should be machine-readable. For example I imagine when managed service migration will happen, managed service vendors will like to retrieve and somehow display that information to users running in 2.11 in terms of "how far they are" and "what needs to be done"
        • we need to be absolutely sure that what is marked as "Airflow 3 compatible" in "Airflow 2.11" - will in-fact be "Airflow 3 compatible".

        b) via Airflow 3

        • have a "compatibility mode" that you can set "per DAG" to run in "Airflow 2" mode.


        I think variant b) is completely out of the question with the number of changes we plan - here I agree with TP that it would largely defeat the purpose of introducing back-compatibility. So a) is the only possible option if we expect people to modify their DAGs.  I bekieve (Tzu-ping Chung ?) variant a) is possible in this case - we should be able to both - flag what needs to be done, introduce the new syntax that should work on Airlfow 2.11 as well as count and explain what needs to be done clearly  - and run some statistics on that and present it to the users?

        I think we really need to change the syntax here, and we really do not want to keep the old one as TP wrote - but it means also that we will need to spend quite some time and develop tooling to help our users to migrate and "incremental migration" and "observability of the migration progress" are I think extremely important to mitigate the risk for our users.



        1. Yeah Option (a) is definitely possible with Airflow 2.11. Compat provider is the 2nd layer and the last layer can be with upgrade  utilities to auto-change dags via CLI command.