Status

StateAccepted
Discussion Thread
Vote Thread[LAZY,CONSENSUS] Example dags
Vote Result ThreadRe: [LAZY,CONSENSUS] Example dags
Progress Tracking (PR/GitHub Project/Issue Label)Github Project "Airflow Examples Refurbish"
Date Created

 

Version Releasedtbd.
Authors

Motivation

Airflow Examples have been grown in number and focus over the past years. They purpose multiple things:

  • Serve as tutorials to learn Airflow DAG implementation
  • Serve with code snippets for documentation
  • Serve for testing the setup
  • (some) service for CI integration testing

Some example DAGs are in a good quality, some are not following best practices. Current examples do not follow a structure.

There are example DAGs contained in the Airflow core (currently pushed to standard provider/example_dags) as well as there are more examples in other providers. But examples from other providers are lot loaded automatically.

So in the Airflow 3 Dev Calls there was a demand named to clean-up and optimize example DAGs.

Considerations / Targets

  • The number of examples should be reduced to 20-30
    • If possible examples from docs should be represented in examples. Some code examples which are stand-alone in code should be moved into examples if possible.
    • Otherwise example content not referenced in documentation might be questioned if beneficial
    • If possible examples should be directly usable by copy&paste (consider imports for integrated snippets)
  • Examples should be arranged along a story-line if possible which might represent a virtual company and support real-life use cases. Might be good if we can structure it as growth of use-cases bringing in need for more features. i.e. starting off basic transformation → needing more complex task so using groups → using sensors, assets... showing progression of usage with project/company maturity. 
  • Examples should follow best practices in coding
  • Existing examples should be reviewed which DAGs are just used for testing. Testing DAGs should be separated and not pollute the example collection
  • Some examples are specific for providers. They should be moved to provider packages
  • A mechanism is to be created that uses DAG bundle loading mechanism to load example DAGs from providers w/o need to copy them to global examples.
  • Same like today if loading of example DAGs is enabled also needed plugins e.g. timetables should be usable out-of-the-box

Storyline

Current idea:

  • "Tailwind" - A virtual / non existing wind park energy company that powers a farm of win-mills to produce clean energy. The company has a strong demand to ETL sensor data from the windmills as well as need to act on data events when base data changes or contracts with customers renew. The company values also the DEI rules and has sustainable targets for clean energy and CO2 reduction.
    • Event driven case: New data is dropped on a file system (S3 would be great but can not be executed w/o S3, can be added to AWS provider as extension) that triggers data processing for wind energy. Data is loaded  and Asset events are generated (see Github Issue 52481)

    • Asset driven pipeline around reporting,  data is split up per city of wind turbine and reports about production are distributed to shareholders. Branching can be used to check if a notification is sent via email or a custom notifier. This can also use branch labels. A third notification channel might be broken and as these shareholders are important we inform the admin in case of any task fails (trigger rule) and start a recovery task.
    • Scheduled nightly use case, example reporting is written to file system (where event pipeline is picking up!). As it would be too easy some tasks migth fail and then a custom weight rule is used for retries.
    • Manual correction trigger: Correction wind production counters can be submitted which then also are written to file system
    • Timetable example for maintenance schedule (selected calendar dates) where maintenance notifications are sent (see Github Issue 52479)
    • Scheduled hourly check for wind turbines state. This requires some special infrastructure to start and stop, using setup+teardown to open a VPN tunnel to the remote machines. This is using a generator pattern and produces the same logic for 3 counties. (see Github Issue 52480)

As for demos and examples a lot of functionality is needed in both decorator as well as classic Dag implementation it would be good to have two similar use cases. Or alternatively describe that the Tailwind south branch prefers to implement all in Pythonic manner whereas the Tailwind North branch data engineers like the classic implementation?

DOT source
digraph D {
    rankdir = LR;
    
    subgraph cluster_p {
        label = "Dag: Turbine Report";
        
        ev [shape = folder;label = "File\nEvent";];
        report [xlabel = "with failure / retry";];
        ingest;
        
        ev -> ingest -> aggregate -> report;
    }
    
    subgraph cluster_c {
        label = "Dag: City Reporting";
        
        split [label = "split by city";];
        
        subgraph clusterc2 {
            label = "Dynamic Mapped Tasks";
            
            seattle;
            portland;
            vancouver;
        }
        split -> seattle -> summary;
        split -> portland -> summary;
        split -> vancouver -> summary;
    }
    
    ingest -> split [color = "#aaaaaa"; constraint = false; label = "asset event";];
    
    subgraph cluster_m {
        label = "Dag: Measurement correction";
        
        form [label="Trigger form w/\ncity\nmeasure", shape=component]
        notify [shape=diamond]

        form -> file -> notify

        notify -> email -> completion
        notify -> push -> completion
        notify -> none -> completion
    }

    file -> ev [color = "#aaaaaa"; constraint = false; label = "asset event"]

    subgraph cluster_f {
        label = "Dag: Turbine Monitoring";
        
        cron [shape = doublecircle; label = "cron:hourly"]
        inventory [label="get inventory"]

        cron -> inventory

        subgraph cluster_f1 {
            label = "Dynamic Mapped Task Group";

            vpn_on [label="vpn connect\n(setup)"]
            vpn_off [label="vpn disconnect\n(teardown)"]

            health [shape=diamond]

            vpn_on -> get_telemetry -> health -> vpn_off
            vpn_on -> check_alarms -> health

            health -> send_technicial [label="notify if alarm"]
            health -> all_ok
        }

        inventory -> vpn_on
    }

    subgraph cluster_s {
        label = "Dag: Maintenance";
        
        timetable [shape = doublecircle; label = "maintenance\ntimetable"]
        form2 [label="Trigger form w/\nturbine\ntime\nduration", shape=component]

        maintenance_on [label="maintenance start\nbash script"]
        wait [label="wait for maintenance completion\ndeferred"]
        maintenance_off [label="maintenance end\nbash script"]

        timetable -> maintenance_on [label="planned schedule"]
        form2  -> maintenance_on [label="manual trigger"]

        maintenance_on -> wait -> maintenance_off
   }

    subgraph cluster_r2 {
        label = "Dag: Shareholder Reporting\nDynamic Dag Generation";

        cron2 [shape = doublecircle; label = "cron:monthly"]

        cron2 -> load_data

        subgraph cluster_g {
            label = "Generated by loop";

            subgraph cluster_ga {
                label="";
                color=none;

                report_stakeholder_a -> send_report_a
            }

            subgraph cluster_gb {
                label="";
                color=none;

                report_stakeholder_b -> send_report_b
            }

            subgraph cluster_gc {
                label="";
                color=none;

                report_stakeholder_c -> send_report_c
            }
        }

        load_data -> report_stakeholder_a
        load_data -> report_stakeholder_b
        load_data -> report_stakeholder_c
    }
}

Technical Work Packages → Features

  • --load-example-dags must load examples from standard provider at least in Airflow 3.1 (same like in breeze hack today) (see Github Issue 52469)
  • Testing DAGs must be loadable (at least in breeze) to be able to remove them from example tree (See Github Issue 52474)
    Likely we should have a dedicated "test_dags" folder/bundle that should contain dags used for testing only. We can automatically add such dags to be used in unit test via auto-fixture in the shared conftest.py or similar - this way it will work in both breeze and local venv. I think we should aim to have breeze == local env
  • DAG Bundles must be extended depending on installed/available providers to extend examples - allowing to move examples from core to providers (e.g. example_kubernetes_executor.py → cncf.kubernetes)
    Likely we can add "examples" section in provider.yaml and move the example dags from "tests" to separate "examples" folder that will be also embedded in the .whl file (so that you can also conditionally enable examples from a given provider). This means that "system" tests will not be a separate "system" folder, but something separate.
  • We should figure out a way how to show examples from a provider - possibly "load_core_examples=true/false" and "load_provider_examples=[list of providers]" would be a nice way how to do it

  • The example have to be reviewed with  "security" point of view. We often have security reports that are pointing to RCE , lack of sanitizations etc. in our examples - which is very important as those examples can be used by others and bad practices propagated to production code.

  • Add a review checklist into the repo to remember the qulity gates we defined for future reviews and extensions after the examples have been cleaned-up. (see Github Issue 52476)
  • Add a check that links in example dags/doc_md are valid to currest RST (See Github Issue 52477)

Proposed Technical Excellence in Example DAGs

  • All code has documentation (pydoc)
  • All DAGs have DAG MD docs and task MD docs
    The MD must include a lightweight Dag Preview - Let's Include a short summary (1–2 sentences) in the Dag’s description, explaining what it demonstrates/features and why it’s useful. So that users don't have to really go through the story to understand that one feature.
    The MD should include links to the official Airflow documentation. This would provide users with instant access to deeper reference material for operators, hooks, or features being demonstrated, without them needing to search separately.
  • All DAGs and Tasks use Typing
  • The Examples use tags mapping to use cases of storyline.
    • All examples carry the the tag "Example"
    • If the DAG is serving as a tutorial, it is having the tag "Tutorial"
    • Current examples that we move off for testing (or which are dual-use) get a tag "Testing"
    • Feature Tags for Easier Discoverability - The new storyline-based example Dags are great for understanding end-to-end workflows. That said, they can be a bit overwhelming if you’re just looking to learn a specific concept (e.g., Dynamic Task Mapping, Assets, Sensors, XCom). It helps to add feature tags to each Dag. So users can more easily find Dags showcasing a specific capability. We should maintain a specific set of tags.

    • Examples for a provider (other than standard) will get a tag with the name of the provider (e.g. "Edge3")
  • Ruff + Mypy checks are enabled and examples follow code quality guidelines
  • DAGs and Tasks have nice display names
  • Examples do not use deprecated functions
  • Examples do not carry top-level code
  • Code in the examples follow best practices with regards to security (sanitizatio, RCE protection, no exposure of sensitive information and the like)
  • All examples are running in the standard setup out-of-the-box. They need to run out-of-the-box (e.g. no connections, SW packages need to be created prior run)
    Note: This will not be possible for some provider examples - for example Google  - they need at least configuration of Google connection and some of them need a separate setup. Possibly in those cases we should just document prerequisites for them. Or we need to find other creative options like checking for existence and short-circuit if per-requisites are not existing (e.g. a snowflake back-end needed). If can be set-up automatically then setup- and teardown tasks might be an option.
  • Examples should integrate into the story-line and not just stand-alone to showcase a technical feature
  • All xamples should be (if possible) available as Taskflow and Non-Taskflow (we call it Classic in the examples)

Current Examples → Example DAG Target Panning

See https://github.com/apache/airflow/tree/main/airflow-core/src/airflow/example_dags

NameGapsReferenced inFeatures usedProposed Change

Core - airflow-core/src/airflow/example_dags

example_asset_alias.py


-AssetAlias, taskflow

Documentation in https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/assets.html#dynamic-data-events-emitting-and-asset-creation-through-assetalias does not use the examples. Either ned to add to real-world example, refernce it in docs or delete them. Tzu-ping Chung Do you have an ida how to map this to a real world example (question)

example_asset_alias_with_no_taskflow.py


-AssetAlias

example_asset_decorator.py


-assets, decoratorsBasic examples should be integrated into the storyline. Else too many dags w/o business example. Then drop (error)

example_assets.py


-assets

example_asset_with_watchers.py


-AssetWatcherNeeds to be intgrated into case, not standalone. Then drop (error)

example_branch_labels.py


airflow-core/docs/core-concepts/dags.rst
Needs to be intgrated into case, not standalone. Then drop (error)

example_branch_python_dop_operator_3.py


-
Needs to be intgrated into case, not standalone, then drop (error)

example_complex.py


airflow-core/docs/howto/usage-cli.rst
Lags a real business case. But complexity might be still a good show case. Check for the resulting example, if similar complexity then drop (error)

example_custom_weight.py


airflow-core/docs/administration-and-deployment/priority-weight.rstpriority weights, Just a technical example. Needs to be kept but would be best to integrate in a real use case. (question)

example_dag_decorator.py


airflow-core/docs/core-concepts/dags.rst
Not needed as standalone example if the business example contains similar code. Rework into a real business example then Drop (error)

example_display_name.py


-
Drop standalone example. Ral display names should be added to all examples. → Drop (error)

example_dynamic_task_mapping.py


airflow-core/docs/authoring-and-scheduling/dynamic-task-mapping.rstDynamic task mappingNice example. But use one alternative from real life and drop the individual dag then (error)

example_dynamic_task_mapping_with_no_taskflow_operators.py


airflow-core/docs/authoring-and-scheduling/dynamic-task-mapping.rstDynamic task mapping

example_inlet_event_extra.py


-

Needs a proper example to showcase something useful. Tzu-ping Chung do you have a real world example to add to the storyline (question) Then rework on this together with outlet events. Then drop (error) this example as standaone file.

example_kubernetes_executor.py


providers/cncf/kubernetes/docs/kubernetes_executor.rst
Move to K8s Provider

example_latest_only_with_trigger.py


airflow-core/docs/core-concepts/dags.rst

I (= Jens Scheffler) do not understand the example as well as not the benefit of this Operator. Is there anybody who can provide a real life example of use? Else drop (error)

example_local_kubernetes_executor.py


-
Move to K8s Provider

example_nested_branch_dag.py


-
Not of any use standalone. Drop (error)

example_outlet_event_extra.py


-
Same like example_inlet_event_extra.py (question)

example_params_trigger_ui.py


airflow-core/docs/core-concepts/params.rst
Migrate to storyline, then drop the individual example (error)

example_params_ui_tutorial.py


airflow-core/docs/core-concepts/params.rst
If not all features can be transferred into storyline, keep this as individual example to be able to test all form elements.

example_passing_params_via_test_command.py


-
Migrate to storyline, then drop the individual example (error)

example_setup_teardown.py


-
Migrate to storyline, then drop the individual example (error)

example_setup_teardown_taskflow.py


-

example_simplest_dag.py


-
Merge with tutorial. No value standalone (error)

example_skip_dag.py


-
Should be integrated into storyline and be added as code reference in docs. Then Drop (error) this example

example_task_group_decorator.py


airflow-core/docs/core-concepts/dags.rstTask GroupMigrate to storyline, then drop the individual example (error)

example_task_group.py


-Task Group

example_time_delta_sensor_async.py


-
Merge with tutorial. No value standalone (error)

example_trigger_target_dag.py


-
Ups, this DAG was forgotten to be moved to standard provider (warning) → But anyway: Migrate to storyline, then drop the individual example (error)

example_workday_timetable.py


code copied, not inlined in airflow-core/docs/howto/timetable.rst
Use as a base for one of the examples in the storyline, then Drop (error)

example_xcomargs.py


-
Use as a base for one of the examples in the storyline, then Drop (error)

example_xcom.py


-
Use as a base for one of the examples in the storyline, then Drop (error)

tutorial_dag.py


-

Consolidate with tutorial.py

(warning) Note is referenced in some tests and needs to be replaces with the other tutorial

tutorial_objectstorage.py


airflow-core/docs/tutorial/objectstorage.rst
(question)

tutorial.py


airflow-core/docs/core-concepts/dag-run.rst

airflow-core/docs/tutorial/fundamentals.rst


Keep as starter tutorial, beautify using technical rules (tick)

tutorial_taskflow_api.py


airflow-core/docs/tutorial/taskflow.rst

tutorial_taskflow_api_virtualenv.py


-

Check if features can be added to storyline for technical completeness, then drop (error)

tutorial_taskflow_templates.py


-
Check if features can be added to storyline for technical completeness, then drop (error)

Standard - providers/standard/tests/system/standard

example_bash_decorator.py


providers/standard/docs/operators/bash.rst
Keep

example_bash_operator.py


providers/standard/docs/operators/bash.rst

airflow-core/docs/core-concepts/debug.rst

contributing-docs/quick-start-ide/contributors_quick_start_vscode.rst


Keep

example_branch_datetime_operator.py


providers/standard/docs/operators/datetime.rst
Keep, but need a better idea

example_branch_day_of_week_operator.py


providers/standard/docs/operators/datetime.rst
needs a better idea (warning)

example_branch_operator_decorator.py


providers/standard/docs/operators/python.rst
Used in the general example storyline, then Drop (error)

example_branch_operator.py


providers/standard/docs/operators/python.rst
Used in the general example storyline, then Drop (error)

example_external_task_child_deferrable.py


-
Drop (error)

example_external_task_marker_dag.py


providers/standard/docs/sensors/external_task_sensor.rst
Drop (error)

example_external_task_parent_deferrable.py


providers/standard/docs/sensors/external_task_sensor.rst
Drop (error)

example_latest_only.py


providers/standard/docs/operators/latest_only.rst
needs a better idea (warning)

example_python_decorator.py


airflow-core/docs/tutorial/taskflow.rst

providers/standard/docs/operators/python.rst


Used in the general example storyline, then Drop (error)

example_python_operator.py


airflow-core/docs/best-practices.rst

providers/standard/docs/operators/python.rst


Used in the general example storyline, then Drop (error)

example_sensor_decorator.py


airflow-core/docs/tutorial/taskflow.rst

providers/standard/docs/sensors/python.rst


Used in the general example storyline, then Drop (error)

example_sensors.py


providers/standard/docs/sensors/bash.rst

providers/standard/docs/sensors/datetime.rst

providers/standard/docs/sensors/file.rst

providers/standard/docs/sensors/python.rst


Used in the general example storyline, then Drop (error)

example_short_circuit_decorator.py


providers/standard/docs/operators/python.rst
needs a better idea (warning)

example_short_circuit_operator.py


providers/standard/docs/operators/python.rst
needs a better idea (warning)

example_trigger_controller_dag.py


providers/standard/docs/operators/trigger_dag_run.rst
See example_trigger_target_dag.py

ArangoDB - providers/arangodb/src/airflow/providers/arangodb/example_dags

example_arangodb.py


providers/arangodb/docs/operators/index.rst
Keep

Oracle - providers/oracle/src/airflow/providers/oracle/example_dags

example_oracle.py


providers/oracle/docs/operators.rst
Keep

Edge - providers/edge3/src/airflow/providers/edge3/example_dags

integration_test.py


-
Keep for the moment

win_notepad.py


-
Keep for the moment

win_test.py


-
Keep for the moment

List of features to cover in new examples

FeatureDescription
Dag authoringSimplest code that allows you to create a dag.
TaskflowUsing decorators to define tasks and dags.
Trigger rulesDecide condition for a task to run.
Setup/TeardownAllow setting setup/teardowns for tasks.
OperatorsShowing a way to use pre-existing pieces to achieve tasks.
Custom weightsUsed to prioritize tasks.
Dynamic task mappingAllow dynamically at runtime generate number of tasks based on inputs.
DAG paramsLet user provide dags for a dagrun.
AssetsLogical grouping of data, can be updated by dags and used to trigger downstream dags
AssetWatcherEvent driven scheduling
  • No labels