Status

StateIn Progress / Under Development
Discussion Thread


Vote Threadhttps://lists.apache.org/thread/7bng2nkp2n8pffsj2mwmthsh1bv4q923
Vote Result Threadhttps://lists.apache.org/thread/0kyp33cp7q5m98s8q7c3pb6nobhprdyt
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

Nov 07, 2023

Version Released
Authors

Motivation

Measure performance changes between Airflow versions. Identify changes that have impact on performance, CPU, memory and disk resources utilization. Improve transparency of performance changes in release notes of Apache Airflow.

Deliver Airflow users a tool to measure performance of their own installations and compare it across different setups or versions.

Considerations

What change do you propose to make?

The framework aims to measure performance and resources footprint of Airflow components.

The framework does not and should not measure performance or resources utilization of third party code (including operators being part of provided packages).

The tests should be executed as part of build process (It's going to be decided later about plugging it into the build pipeline) of Pull Requests.

The tests results should be included as the part of PR documentation (if possible).

The proposed framework is based on the following concepts:

  • Instance - definition of Airflow installation setup.
  • Performance DAG - definition of DAG that is executed during the test.
  • Test suite - combination of Instance and Performance DAG. Test suite may include values for Instance or Performance DAG Jinja placeholders.

Instance is a concept that defines Airflow setup.

Instance is configured using JSON files. Schema of the file is specific to Instance type. Therefore the only required field in each Instance configuration file is instance_type. That field is used by the framework to pick dedicated test execution solution (e.g. Docker Compose, Cloud Composer, GKE, MWAA, etc.).

The remaining fields of the instance configuration may include information such as:

  • number of schedulers
  • number of CPUs in schedulers
  • worker CPU or memory (for Celery workers)
  • machine types

Instance configuration file can include Jinja style placeholders that are populated in Test suite.

Performance DAG

It is a special DAG file that allows to create a simple test scenario by specifying edge conditions:

  • PERF_DAG_FILES_COUNT: number of copies of elastic DAG file that should be uploaded to Instance. Every copy will use the same configuration for DAGs (except for dag_id prefix). Must be a positive int (default: “1”).
  • PERF_DAG_PREFIX: string that will be used as a prefix for filename of every DAG file copy and as a prefix of dag_id for every DAG created (default: “perf_scheduler”).
  • PERF_DAGS_COUNT: number of DAGs that should be created from every copy of DAG file. Must be a positive int (required).
  • PERF_TASKS_COUNT: number of tasks for every DAG. Must be a positive int (required).
  • PERF_START_AGO: a string identifying a duration (eg. 2h13m) that will be subtracted from current time to determine start_date of DAGs (default: “1h”).
  • PERF_START_DATE: a date string in “%Y-%m-%d %H:%M:%S.%f” format identifying the start_date of the DAGs. If not provided, it will be calculated based on the value of PERF_START_AGO and added to the configuration.
  • PERF_SCHEDULE_INTERVAL: schedule_interval for every DAG. Can be either “@once” or an expression following the same format as for PERF_START_AGO (default: “@once”)
  • PERF_SHAPE: a string which determines structure of every DAG. Possible values: “no_structure”, “linear”, “binary_tree”, “star”, “grid” (required).
  • PERF_SLEEP_TIME: a float non-negative value that specifies sleep time in seconds for every task (default: “0”).
  • PERF_OPERATOR_TYPE: a string specifying type of operator to use: “bash” for BashOperator, “python” for PythonOperator or “big_query_insert_job” for BigQueryInsertJobOperator (default: “bash”)
  • PERF_MAX_RUNS: number of DagRuns that should be created for every DAG. This will be achieved by setting DAGs' end_date according to start_date and schedule_interval. The resulting end_date cannot occur in the future. Must be specified if PERF_SCHEDULE_INTERVAL is provided as a time expression and cannot be specified if it is not. Must be a positive int (default: not specified).
  • PERF_START_PAUSED: boolean value determining whether DAGs should be initially paused.
  • PERF_TASKS_TRIGGER_RULE: value defining the rule by which dependencies are applied for the tasks to get triggered. Options are defined in the class airflow.utils.trigger_rule.TriggerRule.
  • PERF_OPERATOR_EXTRA_KWARGS: json value defining extra kwargs for DAG tasks
  • PERF_DYNAMIC_TASKS_MAPPED: number of tasks to be started with Dynamic Tasks Mapping mechanism.
  • PERF_DEFERRED_TASKS: number of deferred tasks to be started.

Test suite

Test suite is represented by a configuration file represented with JSON or YAML file. It includes the following attributes:

  • instance - object defining the instance configuration used in the test run.
    • instance_specification_path - path to the instance definition file.
    • args - map of variables values replacing Jinja placeholders of the instance configuration file.
  • performance_dag - object defining performance DAG running in the test run.
    • performance_dag_specification_path - path to the Performance DAG specification file.
    • dag_file_path - alternatively provide path to DAG file (Python code).
    • args - map of variables values replacing Jinja placeholders of the performance DAG configuration file.
  • attempts - number of attempts the file should be run. The reason for having multiple attempts is to take multiple measurements in order to increase results stability. Airflow instance by default is torn down and recreated between attempts.

Consideration: Test suite can be constructed with list of instances and list of performance_dags objects to run all combinations of those multiplied by attempts. Therefore having 3 instance objects and 4 performance_dag objects and attempts = 5 will produce 12 combinations and execute each of them 5 times resulting in 60 runs.

Running tests

The tests are executed for each Test suite.

Test runner script may include the following parameters:

  • test_suite-path - path of the Test suite definition file.
  • reuse-if-exists - the flag will tell the script to re-use Instance if exists or create new otherwise. If the flag has not been set and the Instance exist the script is expected to fail.
  • delete-if-exists - this flag will cause the script to delete an existing environment with the same name as the one specified in your specification json file and then recreate it.
  • keep-on-finish - this flag will leave the instances created for the test after the tests completes. By default instances created during the tests are deleted.
  • output-path - name of folder to store the output results to.
  • dry-run - test the configuration

Test run is a state machine that walks through the following steps:

  • prepare the instance (either by reusing or deleting an existing one)
  • generate and upload copies of Performance dag
  • wait for all expected Performance DAGs to be parsed by the scheduler
  • unpause the Performance DAGs
  • wait for all the expected Performance Dag Runs to finish (with either a success or failure)
  • write the performance metrics in a csv format to the results table
  • delete the environment (if --delete-upon-finish flag was set).

Creating Airflow instance is delegated to dedicated class handling given instance_type. Initially the following Instance handlers will implemented:

  • kubernetes - creates Airflow instance in Kubernetes cluster.

  • gke - creates Airflow instance in Google Kubernetes Engine.
  • composer - creates Airflow instance with Google Cloud Composer.
  • docker - creates Airflow instance with Docker Compose.
    Additional Instance providers can be included in the test framework and mapped to corresponding instance_type value. The list above comes with a package from Cloud Composer team initial code contribution.

Results format

Test results store the following information in a folder created for the test run. The folder name is provided as an argument to the test runner script. It could be considered to name the folder after the Test suite name and date but the run can include additional parameters populating Jinja variables.

The resulting folder includes the following files:

  • Instance configuration file with resolved Jinja variables values.
  • Performance DAG configuration file with resolved Jinja variables values.
  • Test suite arguments - full list of arguments used to run the suite including default values applied.
  • Test result metrics CSV file.

Tests results metrics CSV file has following columns:

  • scheduler_memory_average - average utilization of Scheduler memory
  • scheduler_memory_max - max utilization of Scheduler memory
  • scheduler_cpu_average - average utilization of Scheduler cpu
  • scheduler_cpu_max - max utilization of Scheduler cpu
  • worker_memory_average - average utilization of Worker memory
  • worker_memory_max - max utilization of Worker memory
  • trigerrer_memory_average - average utilization of Triggerer memory
  • trigerrer_memory_max - max utilization of Triggerer memory
  • trigerrer_cpu_average - average utilization of Triggerer cpu
  • trigerrer_cpu_max - max utilization of Triggerer cpu
  • database_cpu_average - average utilization of metadata database cpu
  • database_cpu_max - max utilization of metadata database cpu
  • database_qps - average number of queries per second to metadata database
  • database_qps_max - max number of queries per second to metadata database
  • test_start_date - earliest start_date of any Dag Runs created as part of the test
  • test_end_date - latest end_date of any Dag Runs created as part of the test
  • test_duration - the difference between test_end_date and test_start_date in seconds
  • dag_run_total_count - total amount of test Dag Runs
  • dag_run_success_count - amount of test Dag Runs that finished with a success
  • dag_run_failed_count - amount of test Dag Runs that finished with a failure
  • dag_run_average_duration - average duration of test Dag Runs, where duration is calculated as difference between Dag Run's end_date and start_date.
  • dag_run_min_duration - minimal duration of any of test Dag Runs
  • dag_run_max_duration - maximal duration of any of test Dag Runs
  • task_instance_total_count - total amount of Task Instances belonging to test Dag Runs
  • task_instance_average_duration - average duration of test Task Instances
  • task_instance_min_duration - minimal duration of any of test Task Instances
  • task_instance_max_duration - minimal duration of any of test Task Instances
  • task_delay_average_duration - minimal duration of delay calculated as difference between execution time and start time
  • task_delay_max_duration - minimal duration of delay calculated as difference between execution time and start time

What problem does it solve?

  1. It makes performance changes visible to Airflow users.
  2. It also helps identifying changes that have unacceptably negative impact on Airflow performance or resources utilization.
  3. It helps the users to adopt to new releases requirements by adding more resources to their Airflow configurations if necessary which has positive impact on new versions adoptions.
  4. Lastly it sends the message that Airflow community takes performance seriously and validates the solution against important metrics.

Why is it needed?

Currently there are no performance tests running for Apache Airflow.

There is no visibility of performance changes and resources utilization (especially when it grows).

Adoption of new versions is exposed to unexpected failures and has negative impact on stability of Airflow.

The tests can be used both during Airflow builds but also by the users who can individually test performance with their own setup.


Are there any downsides to this change?

Tests runs take additional resources and time which is require to setup Airflow instance.

Performance measurement is not perfect and is exposed to transient issues in the infrastructure. Therefore we cannot expect perfectly aligned results. Yet it's worth to have any results to track the deviation from baseline based on run history.

The tests don't answer the question how scalable Airflow is (how many task can it run in parallel or how many DAGs can be tracked by scheduler) although the framework can be adopted to such a purpose in future.

Which users are affected by the change?

Any user who is conscious of the cost, stability and performance of orchestration solution based on Apache Airflow running in their organisation.

How are users affected by the change? (e.g. DB upgrade required?)

The users are provided with additional information in the release notes that may be meaningful to their setup and new versions adoption.

Other considerations?


What defines this AIP as "done"?

This is done once the first release of Apache Airflow includes performance measurement results.


9 Comments

  1. I like the idea.

    IMHO, If we want to move forward with this proposal, we need to focus on Kubernetes deploying the cluster for the different test cases with helm charts, for different reasons:

    • We have a robust helm chart that supports almost all the possible configurations of Airflow.
    • This simplifies the installation and the configurations of each instance, where we can use helmfile to add a second level of templating for chart values instead of developing our custom templating package with Jinja.
    • Kubernetes is the most used scheduler to deploy Airflow; even Airflow-managed service providers started to use Kubernetes to deploy Airflow.
    • We will be agnostic from the cloud provider.
    • We will focus on the performance of Airflow without being impacted by how the cloud providers deploy/configure Airflow.

    There are some questions that need to be answered:

    • When should we run the tests?
      • if we run them on each commit → this will be very costly and useless in most cases
      • if we run them on each release → this will be just an informative result rather than a report used to optimize things
      • maybe running them daily at midnight is a good solution
    • Is there a plan by Google (or other providers?) to sponsor the infrastructure for this project?
    • Should we make our testing platform pluggable? By pluggable, I mean making the tests compatible with different platforms, so even if we use Kubernetes to run them, the cloud providers could use the same tests and run them with other platforms, which can motivate them to contribute to this project.
  2. I also very much like the idea and in my past years of solutionizing I had to execute load tests may times. Also I can tell the story that in recent upgrades we had multiple times pitfalls in performance. Therefore we also have some DAGs currently implemented to test a performance regression when we upgrade our environment.

    Like Hussein Awala I would also propose to leverage a K8s deployment via the most recent helm chart. Size of machines and number of workers, memory limits etc. shall be defined once in a baseline. Also such K8s deployment would cover most of the setups "in the wild". But still there are many combinations and options that can have an influence on setup, so it might need to the fact that some combinations need to be tested (e.g. Celery vs. K8sExecutor?). But I would favor making it once in general and then have it evolving over time. For sure demand and details need to change, some metrics might need to stay stable, other things probably needs to be adjusted.

    I also would propose to have the performance suite set of DAGs and scripts/configuration distributed such that it can be used as an regression in individual setups as comparison or for performance troubleshooting. I would propose to include this in the System test suite. Really if Google/AWS/Azure would sponsor a test environment, would be also great to see how the environments behave different with the same profile (smile).

    Things that I have seen with pitfalls in the past that I'd like to have included/propose to include are:

    • Ensure that DAG performance does not degrade with many runs (we had this in 2.7.0). Our performance test triggers 50k DAG runs and checks how long it takes to complete them (each DAG 1 task only, just a PythonOperator printing "Hello World")
    • Ensure that dynamic task mapping is performant: Have a DAG that maps 10 times 1000 tasks via dynamic task mapping, see how long it executes. Just a PythonOperator printing "Hello World"
    • One case which I assume many setups have but we (currently) don't suffer is the case of many DAGs: Produce 1000 DAGs and see how long it takes to parse these.

    Overall I would support this initiative and am happy to guide to review with experience! THANKS!

  3. It seems to be a valid expectation to run the framework with K8s. I have added such a requirement explicitely. 

    Regarding adding a set of DAGs to the framework - I'm happy to include preconfigured performance DAGs in the framework. 

    In respect to your previous observations - running DAGs multiple times to collect more representative data has been included in the proposal. Please see `attempts` parameter. Also important to run the test for a longer moment with larger number of DAGs/tasks to collect stable statistics.

    We need to think about some tests covering Dynamic tasks mapping. That problem has been brought up in this discussion. I think it can be added as a performance DAG parameter - e.g. PERF_DYNAMIC_TASKS_MAPPED to represent the number of tasks to be induced.

    Parsing is a different challenge. I think that generating thousands of DAG files with the framework could serve a purpose of a simple parser testing. This is already possible with the proposed framework. The DAGs don't need to be complex as we are not testing Python interpreter but the file processing mechanism in DAG processor.

  4. Thanks for the insightful proposal, Bartosz Jankiewicz 

    I agree with Hussein Awala's suggestion to use Kubernetes (K8s) deployment. It'll save time in customizing our performance suite, letting us focus more on developing the test suite and solutions.

    Also, running the suite at midnight sounds good for getting enough results. Are we considering tools like Prometheus + Grafana for better result visualization?

  5. I guess reminding on the devlist and possibly just putting it up to a vote next should be the next step. I don't expect much controversy here.

  6. Also, there is an overlap with AIP-64: Keep TaskInstance try history. where testing regressions after DAG versioning is implemented is an important aspect.

    1. I'm sharing it for internal reviews with my colleagues and open a PR this month

      1. Thanks, Bartosz Jankiewicz. Can you share the PR when it's done? I would love to try it out.