Status

StateCompleted
Discussion Threadhttps://lists.apache.org/thread/htd4013yn483qfhwv11vc26jpf2yvjph
Voting threadhttps://lists.apache.org/thread/54hnnndpxpbdrx9ms4z8r3hsy120f6c3
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Accepted

 

Completed

 

AuthorsBartłomiej Hirsz, Eugene Kostieiev, Mateusz Nojek, Rafał Biegacz

Motivation

Current state

Currently, system tests are not integrated into the CI process and rarely being executed which makes them outdated and faulty. The purpose of testing is to preserve a high quality of the product, thus tests should be run regularly and should be easy to maintain. Old design uses additional dependencies like credential files and many unnecessary environment variables which introduce hard to maintain complexity.


Progress of AIP-47 implementation is kept in https://github.com/apache/airflow/issues/24168 

The motivation of this AIP

The new design of system tests doesn't change the tests themselves but redefines how they are run. Also, it includes CI integration which allows us to run system tests automatically.

Currently, there are many issues related to how Airflow Operators (not) work and having automated testing in place, we can decrease the amount of possible bugs reported.

Assuming that the new design is approved and implemented we take benefit from:

  • Assuring the high quality of providers
  • Running operators regularly with user-oriented use cases
  • Possibly lower rate of creation of new bugs
  • Simpler way of running system tests
  • Faster results from system tests execution
  • Easier maintenance of system tests
  • Decreased entry threshold for writing new system tests

What change do you propose to make?

  1. Each test is self-contained, which means that all information required to run the test is stored within the test file. If some additional big chunk of data is required, it needs to be stored separately as a resource file imported into the test file.
  2. The test file is basically a DAG file with tasks running the operators - some of them are operators under test, and some are there for setup and teardown necessary resources. It may also contain tasks that verify correctness of executed operators. These test files will be deployed to the DAGs folder of Airflow and executed as regular DAGs.
    For pytest support, at the end of each file there will be several lines that will enable running DAG with this tool. The presence of these lines will be checked automatically using a pre-commit hook and, if absent, added automatically. To see the example, go to Complete test example section.
  3. Setup and teardown parts of the test will be also a part of the test either with usage of operators or task-decorated methods. If a resource needs to be cleaned up after the test, an operator needs to be defined with a parameter trigger_rule set to "all_done" (or with using enum TriggerRule.ALL_DONE from module airflow.utils.trigger_rule). More about setups and teardowns here. By using teardowns as tasks with trigger_rule, there needs to be also another mechanism for checking state of the tasks - watcher (more about it here).
  4. New design doesn’t require entering a breeze environment to run the tests. They can be easily run with pytest + DebugExecutor or even triggered using IDE.
  5. All system tests are going to be stored inside the tests/system/providers/<provider_name> directory to keep them separate from other types of tests. As all the tests are actually DAGs they can be executed in parallel by Airflow.

    Another possibility is to store these tests under the providers directories (tests/providers/<provider_name>/system/).
    Nevertheless, there is no need to store them under the “example_dags” directory because they will be displayed as examples in the documentation anyway.

    The paths for the documentation generated from code of the tests (example dags) need to be updated accordingly. Old tests need to be removed when transferred to the new design because maintaining both would require a significant amount of time and provides very low value.

    Note

    In the old design, system tests were stored inside tests/providers/<provider_name>/<service_name>/operators and they pointed to the example DAGs located in airflow/providers/<provider_name>/<service_name>/example_dags.
  6. Amount of environment variables needed to run the tests will be kept at minimum. All data that needs to be unique across the Airflow instance running the tests now should use SYSTEM_TESTS_ENV_ID and DAG_ID as unique identifiers. In the previous implementation, the variables.env file was used to gather all unique values. More about it (with example) in the end of section “What problem does it solve?”.
  7. With the new approach, no credential files are needed for most of the tests. All needed permissions to external services for execution of DAGs (tests) should be provided to the Airflow instance in advance. This is the responsibility of the Airflow setup process but not responsibility of tests itself.
    Each provider should create an instruction explaining how to prepare the environment to run related system tests so that users can do it on their own. The provider should also prepare an environment for running those tests in the CI integration to enable running them regularly. Providing the environment also for local execution is recommended, so that users of Airflow can run the tests when updating system tests of a specific provider.
  8. CI integration can be built using GitHub CI or provider-related solution (like Cloud Build for Google tests).
    Using GitHub CI can potentially block precious resources for running other checks for quite a long time, while using provider-related systems requires some additional set up but would be executed as unrelated action.
    No matter which solution will be chosen, the tests can be set to trigger only if specific tests were edited and only those tests will be executed.
    More about the pros and cons of each solution in the Design details →How to run tests.

What problem does it solve?

Using DAG files as test files enables us to keep all code within 1 file. We don’t need to bother about special dependencies listed above - we upload a DAG file with its assets (like data files) directly to Airflow and it runs. Since the Airflow executor is used to run the tests, they will be run in parallel (depending on the Airflow configuration). Having 1 test file makes it easier to maintain system tests.

Setting up the breeze environment is not that easy as it is stated and because running system tests in the current design requires running breeze, it can be hard and painful. Reducing the amount of configuration and a possibility to run tests directly on Airflow makes it easier for developers to write and run the tests and to maintain CI integration. Also, without the requirement of pytest dependency, we only need to rely on the Airflow environment, which should positively affect the stability of tests.

Current tests perform a lot of reading from environment variables that need to be set before the tests are run. That means that a team running tests for a specific provider needs to maintain a file containing all environment variables that are considered unique. Now, all data and names of the variables that require uniqueness can incorporate DAG_ID and optionally ENV_ID into their value to avoid risk of collision. The ENV_ID needs to be generated before the DAGs are run and the length of its value needs to be long enough to minimize the possibility of collision (e.g. 6-characters-long string containing lowercase letters and numbers).

Example of creating new name for Google Cloud Storage Bucket with this approach:

import os
ENV_ID = os.environ["SYSTEM_TESTS_ENV_ID"]
DAG_ID = "example_dag"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}

Why is it needed?

System tests are not currently maintained and run. Debugging and fixing mostly not working tests is a very time consuming process. The lack of CI integration causes them to age and deprecate. With the new design of system tests, they are intended to be run regularly.

The documentation for Airflow Operators is generated from source code of system tests, so not working code produces not working examples in the documentation, spreading errors and bad practises into the community. Running system tests on a daily basis, we ensure that the examples are up-to-date.

Maintaining system tests requires knowledge about breeze, pytest and maintenance of special files like variables.env or credential files. Without those, we will simplify the architecture and improve management over tests.

Which users are affected by the change?

The change mostly affects developers of providers that currently (as of 14th March 2022) have Airflow system tests or example DAGs and potentially future developers that will create new system tests.

It will also affect Airflow users who are using Airflow providers and it will improve their experience because we will have automation of running system tests which will assure high quality of providers.

System tests affecting providers

The list of providers affected by the change with related products and number of system tests:

  • Google (Cloud) - 107 tests
  • Amazon (AWS) - 14 tests
  • Apache (Beam) - 8 tests
  • Microsoft (Azure) - 5 tests
  • CNCF (Kubernetes) - 1 test
  • Snowflake - 1 test
  • Postgres - 1 test
  • Asana - 1 test
  • HTTP - 1 test
  • Airflow (Built-in Operators) - 5 tests

Most of the tests (around 75%) operate over Google Cloud products.

The total number of system tests (as of 14th March 2022) is 144 (from 10 different providers).

Example DAGs affecting providers

These tests need to be migrated in order to be run in the to-be-created CI integration with system tests.

Moreover, there are example DAGs located at airflow/providers/<provider_name>/*/example_dags/example_*.py (and examples of core features located in airflow/example_dags/example_*.py) which serve as helpers for the users to provide examples on how to use different operators. They are also used for generating documentation. Some of them overlap with the system tests, some are just a dead piece of code. It's up to the provider whether these examples are going to be migrated to system tests, but this is highly recommended.

The total number of example DAGs (as of 14th March 2022) is 213 (from 37 different providers).

What defines this AIP as "done"?

Possibility to run system tests with the new design regularly as part of the CI process on GitHub with a report that is easy to understand and helps maintainers to correctly find a place where a problem (if any) occurs. Old system tests should be moved & refactored or deleted (if not applicable or deprecated).

Design details

Process of migration in details

  1. Create new file with the name of the service you are going to migrate in tests/system/providers/<provider_name/<service_name>/example_file.py
    Remember to prefix the filename with example_*.

  2. Add meaningful docstring at the top of the file about the tests you are about to include in the test file.
  3. Setup and teardown methods

    1. Check if the system test you are going to migrate doesn’t have any additional configuration that is required to run it. Good place to start is where the pytest test is triggered (tests/providers/<provider_name>/…/test_*_system.py) and look for any actions executed inside setUp or tearDown methods.

    2. Try to rewrite those actions using another available Airflow Operators as tasks or just use PythonOperator or BashOperator.

    3. If you’re creating any resource during the tests, remember to remove them at the end (by creating a teardown task) if they’re not needed anymore.

    4. If a teardown task(s) has been defined, remember to add trigger_rule=TruggerRule.ALL_DONE parameter (import it using from airflow.utils.trigger_rule import TriggerRule) to the operator call. This will make sure that this task will always run even if the upstream fails. If an operator is a part of the generated documentation (decorated with # [START howto_blahblah] and # [END howto_blahblah]), make sure to add trigger rule outside of the task declaration. Just type <task_name>.trigger_rule = TriggerRule.ALL_DONE.

  4. Define global variables:

    1. Define DAG name at the top of the file as DAG_ID global variable. Keep it short and unique, because it can be a part of other variables’ names.

    2. Add ENV_ID variable at the top of the file that is read from SYSTEM_TESTS_ENV_ID environment variable: os.environ["SYSTEM_TESTS_ENV_ID"]

    3. Define any other commonly used variables (paths to files, data etc.) in the tasks at the top of the file. Include DAG_ID and ENV_ID variables in the value of these variables to decrease the possibility of having a conflict when running multiple tests in parallel. Example:
      DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"

  5. Make sure to include these parameters into DAG call:

    1. schedule_interval="@once" - tells the scheduler to schedule the task only once,

    2. start_date=datetime(2021, 1, 1) - makes sure that the DAG should be already executed,

    3. catchup=False - prevents from executing the DAG many times to fill the gap between start_date and today,

    4. tags=["example", "something"] - adds tags to quickly search the DAG.

  6. Put a task order after the tasks declaration in the DAG. Preferably define them line-by-line and add comments to explicitly show which task is setup/teardown and which is the test body (operators that are actually tested). Example:

    (
        # TEST SETUP
        create_dataset
        >> create_table
        # TEST BODY
        >> copy_selected_data
        # TEST TEARDOWN
        >> delete_dataset
    )
  7. If you have a teardown task (with TriggerRule.ALL_DONE), you need to add the watcher task dependency for all other tasks by adding these lines of code below all other task dependencies:

    from tests.system.utils.watcher import watcher
    
    # This test needs watcher in order to properly mark success/failure
    # when "tearDown" task with trigger rule is part of the DAG
    list(dag.tasks) >> watcher()

    The watcher import is kept close to its usage and the comment helps understand why it's needed.

  8. Try to keep tasks in the DAG body defined in an order of execution.

  9. At the bottom of the file add methods that will enable the test to be run with pytest:

    from tests.system.utils import get_test_run  # noqa: E402
    
    # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
    test_run = get_test_run(dag)
  10. Remember, that these tests are also treated like examples for the community so keep them clean, concise and easy to understand.

  11. Update the comment tags that mark the documentation script where to start and end reading the operator code that will be generated as an example in the official documentation. Look for something like this:
    # [START howto_operator_bigquery_create_table]
    or this:
    # [END howto_operator_bigquery_create_table]
    And then update the path to the test file inside the RST file after
    .. exampleinclude:: that is related to the corresponding example.

  12. If the test needs any additional resources, put them into resources directory (create if it doesn’t exist) close to the test files. When referencing path to resource files, make sure to use Pathlib to define absolute path to them. Example:
    FILE_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)

  13. Check if documentation for the current service doesn’t miss any operator and if so, please add meaningful documentation for it.
    Reread the whole documentation for the service and check if it’s logical (the order of introduced operators is consistent and their complexity increases along with the next examples), easy to understand and properly written.
  14. Once you verify that the test in the new design works, remove the old test and example DAG. Possible locations to check:

    1. tests/providers/<provider_name>/<service_name>/test_*_system.py

    2. airflow/providers/<provider_name>/<service_name>/example_dags/example_<service>.py

  15. Congratulations! Your test is ready to be executed countless times!

Complete test example

"""
Example Airflow DAG for Google BigQuery service local file upload and external table creation.
"""
import os
from datetime import datetime

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryCreateExternalTableOperator,
)
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils.trigger_rule import TriggerRule


ENV_ID = os.environ["SYSTEM_TESTS_ENV_ID"]
DAG_ID = "bigquery_operations"

DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
DATA_SAMPLE_GCS_OBJECT_NAME = "bigquery/us-states/us-states.csv"
CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "us-states.csv")


with models.DAG(
    DAG_ID,
    schedule_interval="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example", "bigquery"],
) as dag:
    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME)

    create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

    upload_file = LocalFilesystemToGCSOperator(
        task_id="upload_file_to_bucket",
        src=CSV_FILE_LOCAL_PATH,
        dst=DATA_SAMPLE_GCS_OBJECT_NAME,
        bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    )

    # [START howto_operator_bigquery_create_external_table]
    create_external_table = BigQueryCreateExternalTableOperator(
        task_id="create_external_table",
        destination_project_dataset_table=f"{DATASET_NAME}.external_table",
        bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
        source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
        schema_fields=[
            {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
            {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
        ],
    )
    # [END howto_operator_bigquery_create_external_table]

    delete_bucket = GCSDeleteBucketOperator(
        task_id="delete_bucket",
        bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME,
        trigger_rule=TriggerRule.ALL_DONE,
    )

    (
        # TEST SETUP
        [create_bucket, create_dataset]
        # TEST BODY
        >> upload_file
        >> create_external_table
        # TEST TEARDOWN
        >> delete_bucket
    )

    from tests.system.utils.watcher import watcher

    # This test needs watcher in order to properly mark success/failure
    # when "tearDown" task with trigger rule is part of the DAG
    list(dag.tasks) >> watcher()   


from tests.system.utils import get_test_run  # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

Tasks as setup and teardown

In pytest it’s possible to have setUp and tearDown methods that can prepare the environment for the test and clean after it’s executed. By dropping these pytest wrappers for system tests and having tests as self-contained DAG files, we need to move these operations inside the DAG files. This means that setup and teardown is going to be done by Operators.

For example, when running a BigQuery service, the creation of GCS Bucket is required often and it can be done by using dedicated operators. Also, the deletion of the bucket can be achieved by calling a specific operator. To give an information to the user about what is the actual test body and what are the tasks operating around the test, comments can be used in a fragment where tasks dependencies are defined, e.g.:

(
    # TEST SETUP
    [create_bucket, create_dataset]
    # TEST BODY
    >> upload_file
    >> create_external_table
    # TEST TEARDOWN
    >> delete_bucket
)

Additionally, teardown tasks are often considered to clean after the test, no matter if they passed or failed (if something was created before the test, teardown should remove it). For that purpose we can use trigger_rule attribute that is available for each operator. By choosing ”all_done” (or enum TriggerRule.ALL_DONE) as a value for trigger_rule we make sure that this (teardown) operator will be run no matter the results from upstream tasks (even if skipped) but always preserving the tasks execution order. Example:

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True, trigger_rule=TriggerRule.ALL_DONE
)

Note

Using operators with TriggerRule.ALL_DONE influences the DAG Run status and may cause tests with failing tasks appear with passed state. This is an expected behavior and the solution for that is to use the watcher task. More about it in the section below.

Watcher task

Background

Usually, if the task in the DAG fails the execution of the DAG stops and all downstream tasks are assigned with the upstream_failed status. This status is propagated to the DAG and the whole DAG Run gets failed status. But there are exceptions from this flow, mostly happening when we are using Trigger Rules. Let's take a look how the DAG Run status is determined, but before that we need to understand what "leaf nodes" are. The "leaf nodes" are the tasks that do not have any children, i.e. they do not have any downstream tasks. When at least one leaf node fails, the whole DAG Run is also marked as failed, but if all leaf nodes pass, the DAG Run will also get the passed status. Our teardown tasks are leaf nodes, because they need to be executed at the end of the test, thus they propagate their status to the whole DAG. This can lead to an unexpected (from tester's perspective) situation where some task in the middle of the DAG fails, but because there is also a teardown tasks that will probably pass, the DAG Run status will also get the passed status and that way we are losing the information about failing task. In normal tests, when any step fails, the whole test is expected to also fail, but this is not how Airflow's DAGs work. To achieve a "test-like" behavior, we need to introduce a watcher task.

Description

A watcher task is a task that is a child for all other tasks, i.e. it has a dependency for all other tasks. Thanks to this relation, when any task (that is also an upstream task for a watcher) fails, its status will be propagated to the watcher task, and because the watcher is always a leaf node, its status will be the same as the whole DAG Run. So in practice, if any task fails, watcher will also fail and will pass its status to the whole DAG Run. Similarly, when all tasks pass, watcher task will be skipped and will not influence the DAG Run status (which will be passed in this case). The watcher task needs to have trigger_rule set to "one_failed" (or by using enum TriggerRule.ONE_FAILED). This assures that it will run only when an upstream task fails. To ensure that when it triggers, it will fail, we need to just raise an exception. Look below to see the example of a watcher task.

Example

@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0)
def watcher():
    raise AirflowException("Failing task because one or more upstream tasks failed.")

This piece of code can be located inside airflow/utils and can be just imported into each test and called there simply by:

list(dag.tasks) >> watcher()

This line should be places in each DAG right after the task dependencies. The attribute tasks of a DAG is a list of all tasks and we are using a bitshift operator to set all these tasks as an upstream for the watcher task. By type conversion list() we make sure that the dag.tasks attribute is handled properly in case of the future changes. To see the complete test example, click here.

Creation of unique values for variables

As mentioned above in What problem does it solve?, sometimes there is a need to create a variable with a unique value to avoid collision in the environment that runs tests. By using the property that DAG_ID needs to be unique across all DAGs, we can benefit from it by using its value to actually create data that will not interfere with the rest.

Example:

ENV_ID = os.environ["SYSTEM_TESTS_ENV_ID"]
DAG_ID = "example_dag"
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}

with models.DAG(
    DAG_ID,
    schedule_interval="@once",
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)

Test structure

The tests should be structured in the way that they are easy to run as “standalone” tests manually but they should also nicely be integrated into pytest test execution environment. This can be achieved by leveraging the DebugExecutor and utilising modern pytest test discovery mechanism (pytest will automatically discover all top-level functions in the module starting with test_* as “test cases”). The pytest.ini was modified to recognize files starting with prefixes test_* and example_* as test files (python_files = test_*.py example_*.py). This allows to have very simple and seamless integration with pytest (and open for all the useful features it has), but without introducing boilerplate code and with allowing to run the tests manually without using pytest:

The tests can be run by either pytest <file> or pytest <module> to run multiple tests.

with DAG as dag:
     ...

from tests.system.utils import get_test_run  # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)

The imported utils consists this piece of code:

from airflow.utils.state import State


def get_test_run(dag):
    def test_run():
        dag.clear(dag_run_state=State.NONE)
        dag.run()

    return test_run

To use DebugExecutor by default when running tests with pytest, there is a conftest.py file added in tests/system directory which sets AIRFLOW__CORE__EXECUTOR for the purpose of test execution:

@pytest.fixture(scope="package", autouse=True)
def use_debug_executor():
    with mock.patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="DebugExecutor"):
        yield

How to run tests

Running system tests can be done in multiple ways depending on the environment and the user’s choice. When developing new tests or adding new features for Airflow, a user may want to run system tests to see if nothing’s broken. For this, one can refer to  a section below describing how to run system tests locally. The tests are going to be run also in the CI process of releasing a new Airflow version or provider’s packages. The section “In CI process” explains how tests will be integrated in the CI/CD.

Locally

System tests can be executed using pytest. You can either navigate to test file and run tests using your IDE widget for pytest (the tests should be discovered as pytest tests by your IDE) or run following command:

pytest tests/system/path/to/test_file.py

You can also use Breeze to run the tests.

In CI process

Running System tests usually take much more time and resources than running unit tests. Some of the tests might run for literally hours, and blocking GitHub Actions workers for the tests might not be the best idea. GitHub Runners for Apache Airflow are a shared resource with other Apache projects (Apache has maximum 150 running runners for all their projects) and blocking the runners for a long time, already caused problems in the past. 

However in order to fulfill their role, the system tests should be run periodically and when Pull Requests are pushed, with changes related to the tests in question. This might be done by using the approach Community uses currently (selective CI checks) but with more granularity if we find that the system tests take too much time. 

We propose to use the “related” system to provide an execution engine - for example if we run GCP system tests we can use Cloud Build to execute the tests. (Proof of Concept done by Tobiasz Kędzierski in https://github.com/PolideaInternal/airflow-system-tests). Similarly to run AWS tests we could use AWS Code Pipeline and for Azure - Azure Pipelines. We need anyhow credits for the respective cloud providers so those credits could be utilised to run both - services we test and CI/CD for those. All such services have integration with GitHub repositories. Each of those integration needs to be done following these principles:

  • Public access to Build dashboard and build logs. This might be not easy - for example Cloud Build dashboard cannot be “publicly viewable” however logs can be exported and made publicly available via links and integration from GitHub Actions.

  • Status of the build should be reported back to GitHub Actions for Pull Requests. This can be achieved by already existing integrations, for example Cloud Build integration. Further analysis will need to be done in order to make detailed integration. 

  • If needed, the external service can have “check:write” permission and provide appropriate status checks for PR via the GitHub API https://docs.github.com/en/rest/reference/checks#create-a-check-run. That will require to authorise the system via specific tokens to have those permissions and might require cooperation with the Apache Software Foundation Infrastructure team.

Also having the “big” cloud provider credits for those checks will enable using those credits to run checks for other services (those are rather inexpensive usually).

2 Comments

  1. Please note that all three instances of

    list(dag.tasks) >> watcher

    should be

    list(dag.tasks) >> watcher()