Status
Motivation
Measure performance changes between Airflow versions. Identify changes that have impact on performance, CPU, memory and disk resources utilization. Improve transparency of performance changes in release notes of Apache Airflow.
Deliver Airflow users a tool to measure performance of their own installations and compare it across different setups or versions.
Considerations
What change do you propose to make?
The framework aims to measure performance and resources footprint of Airflow components.
The framework does not and should not measure performance or resources utilization of third party code (including operators being part of provided packages).
The tests should be executed as part of build process (It's going to be decided later about plugging it into the build pipeline) of Pull Requests.
The tests results should be included as the part of PR documentation (if possible).
The proposed framework is based on the following concepts:
- Instance - definition of Airflow installation setup.
- Performance DAG - definition of DAG that is executed during the test.
- Test suite - combination of Instance and Performance DAG. Test suite may include values for Instance or Performance DAG Jinja placeholders.
Instance is a concept that defines Airflow setup.
Instance is configured using JSON files. Schema of the file is specific to Instance type. Therefore the only required field in each Instance configuration file is instance_type
. That field is used by the framework to pick dedicated test execution solution (e.g. Docker Compose, Cloud Composer, GKE, MWAA, etc.).
The remaining fields of the instance configuration may include information such as:
- number of schedulers
- number of CPUs in schedulers
- worker CPU or memory (for Celery workers)
- machine types
Instance configuration file can include Jinja style placeholders that are populated in Test suite.
Performance DAG
It is a special DAG file that allows to create a simple test scenario by specifying edge conditions:
- PERF_DAG_FILES_COUNT: number of copies of elastic DAG file that should be uploaded to Instance. Every copy will use the same configuration for DAGs (except for dag_id prefix). Must be a positive int (default: “1”).
- PERF_DAG_PREFIX: string that will be used as a prefix for filename of every DAG file copy and as a prefix of dag_id for every DAG created (default: “perf_scheduler”).
- PERF_DAGS_COUNT: number of DAGs that should be created from every copy of DAG file. Must be a positive int (required).
- PERF_TASKS_COUNT: number of tasks for every DAG. Must be a positive int (required).
- PERF_START_AGO: a string identifying a duration (eg. 2h13m) that will be subtracted from current time to determine start_date of DAGs (default: “1h”).
- PERF_START_DATE: a date string in “%Y-%m-%d %H:%M:%S.%f” format identifying the start_date of the DAGs. If not provided, it will be calculated based on the value of PERF_START_AGO and added to the configuration.
- PERF_SCHEDULE_INTERVAL: schedule_interval for every DAG. Can be either “@once” or an expression following the same format as for PERF_START_AGO (default: “@once”)
- PERF_SHAPE: a string which determines structure of every DAG. Possible values: “no_structure”, “linear”, “binary_tree”, “star”, “grid” (required).
- PERF_SLEEP_TIME: a float non-negative value that specifies sleep time in seconds for every task (default: “0”).
- PERF_OPERATOR_TYPE: a string specifying type of operator to use: “bash” for BashOperator, “python” for PythonOperator or “big_query_insert_job” for BigQueryInsertJobOperator (default: “bash”)
- PERF_MAX_RUNS: number of DagRuns that should be created for every DAG. This will be achieved by setting DAGs' end_date according to start_date and schedule_interval. The resulting end_date cannot occur in the future. Must be specified if PERF_SCHEDULE_INTERVAL is provided as a time expression and cannot be specified if it is not. Must be a positive int (default: not specified).
- PERF_START_PAUSED: boolean value determining whether DAGs should be initially paused.
- PERF_TASKS_TRIGGER_RULE: value defining the rule by which dependencies are applied for the tasks to get triggered. Options are defined in the class airflow.utils.trigger_rule.TriggerRule.
- PERF_OPERATOR_EXTRA_KWARGS: json value defining extra kwargs for DAG tasks
- PERF_DYNAMIC_TASKS_MAPPED: number of tasks to be started with Dynamic Tasks Mapping mechanism.
- PERF_DEFERRED_TASKS: number of deferred tasks to be started.
Test suite
Test suite is represented by a configuration file represented with JSON or YAML file. It includes the following attributes:
instance
- object defining the instance configuration used in the test run.instance_specification_path
- path to the instance definition file.args
- map of variables values replacing Jinja placeholders of the instance configuration file.
performance_dag
- object defining performance DAG running in the test run.performance_dag_specification_path
- path to the Performance DAG specification file.dag_file_path
- alternatively provide path to DAG file (Python code).args
- map of variables values replacing Jinja placeholders of the performance DAG configuration file.
attempts
- number of attempts the file should be run. The reason for having multiple attempts is to take multiple measurements in order to increase results stability. Airflow instance by default is torn down and recreated between attempts.
Consideration: Test suite can be constructed with list of instances and list of performance_dags objects to run all combinations of those multiplied by attempts. Therefore having 3 instance objects and 4 performance_dag objects and attempts = 5 will produce 12 combinations and execute each of them 5 times resulting in 60 runs.
Running tests
The tests are executed for each Test suite.
Test runner script may include the following parameters:
test_suite-path
- path of the Test suite definition file.reuse-if-exists
- the flag will tell the script to re-use Instance if exists or create new otherwise. If the flag has not been set and the Instance exist the script is expected to fail.delete-if-exists
- this flag will cause the script to delete an existing environment with the same name as the one specified in your specification json file and then recreate it.keep-on-finish
- this flag will leave the instances created for the test after the tests completes. By default instances created during the tests are deleted.output-path
- name of folder to store the output results to.dry-run
- test the configuration
Test run is a state machine that walks through the following steps:
- prepare the instance (either by reusing or deleting an existing one)
- generate and upload copies of Performance dag
- wait for all expected Performance DAGs to be parsed by the scheduler
- unpause the Performance DAGs
- wait for all the expected Performance Dag Runs to finish (with either a success or failure)
- write the performance metrics in a csv format to the results table
- delete the environment (if
--delete-upon-finish
flag was set).
Creating Airflow instance is delegated to dedicated class handling given instance_type
. Initially the following Instance handlers will implemented:
kubernetes - creates Airflow instance in Kubernetes cluster.
gke
- creates Airflow instance in Google Kubernetes Engine.- composer - creates Airflow instance with Google Cloud Composer.
docker
- creates Airflow instance with Docker Compose.
Additional Instance providers can be included in the test framework and mapped to correspondinginstance_type
value. The list above comes with a package from Cloud Composer team initial code contribution.
Results format
Test results store the following information in a folder created for the test run. The folder name is provided as an argument to the test runner script. It could be considered to name the folder after the Test suite name and date but the run can include additional parameters populating Jinja variables.
The resulting folder includes the following files:
- Instance configuration file with resolved Jinja variables values.
- Performance DAG configuration file with resolved Jinja variables values.
- Test suite arguments - full list of arguments used to run the suite including default values applied.
- Test result metrics CSV file.
Tests results metrics CSV file has following columns:
scheduler_memory_average
- average utilization of Scheduler memoryscheduler_memory_max
- max utilization of Scheduler memoryscheduler_cpu_average
- average utilization of Scheduler cpuscheduler_cpu_max
- max utilization of Scheduler cpuworker_memory_average
- average utilization of Worker memoryworker_memory_max
- max utilization of Worker memorytrigerrer_memory_average
- average utilization of Triggerer memorytrigerrer_memory_max
- max utilization of Triggerer memorytrigerrer_cpu_average
- average utilization of Triggerer cputrigerrer_cpu_max
- max utilization of Triggerer cpudatabase_cpu_average
- average utilization of metadata database cpudatabase_cpu_max
- max utilization of metadata database cpudatabase_qps
- average number of queries per second to metadata databasedatabase_qps_max
- max number of queries per second to metadata databasetest_start_date
- earlieststart_date
of any Dag Runs created as part of the testtest_end_date
- latestend_date
of any Dag Runs created as part of the testtest_duration
- the difference betweentest_end_date
andtest_start_date
in secondsdag_run_total_count
- total amount of test Dag Runsdag_run_success_count
- amount of test Dag Runs that finished with a successdag_run_failed_count
- amount of test Dag Runs that finished with a failuredag_run_average_duration
- average duration of test Dag Runs, where duration is calculated as difference between Dag Run'send_date
andstart_date
.dag_run_min_duration
- minimal duration of any of test Dag Runsdag_run_max_duration
- maximal duration of any of test Dag Runstask_instance_total_count
- total amount of Task Instances belonging to test Dag Runstask_instance_average_duration
- averageduration
of test Task Instancestask_instance_min_duration
- minimalduration
of any of test Task Instancestask_instance_max_duration
- minimalduration
of any of test Task Instancestask_delay_average_duration
- minimalduration
of delay calculated as difference between execution time and start timetask_delay_max_duration
- minimalduration
of delay calculated as difference between execution time and start time
What problem does it solve?
- It makes performance changes visible to Airflow users.
- It also helps identifying changes that have unacceptably negative impact on Airflow performance or resources utilization.
- It helps the users to adopt to new releases requirements by adding more resources to their Airflow configurations if necessary which has positive impact on new versions adoptions.
- Lastly it sends the message that Airflow community takes performance seriously and validates the solution against important metrics.
Why is it needed?
Currently there are no performance tests running for Apache Airflow.
There is no visibility of performance changes and resources utilization (especially when it grows).
Adoption of new versions is exposed to unexpected failures and has negative impact on stability of Airflow.
The tests can be used both during Airflow builds but also by the users who can individually test performance with their own setup.
Are there any downsides to this change?
Tests runs take additional resources and time which is require to setup Airflow instance.
Performance measurement is not perfect and is exposed to transient issues in the infrastructure. Therefore we cannot expect perfectly aligned results. Yet it's worth to have any results to track the deviation from baseline based on run history.
The tests don't answer the question how scalable Airflow is (how many task can it run in parallel or how many DAGs can be tracked by scheduler) although the framework can be adopted to such a purpose in future.
Which users are affected by the change?
Any user who is conscious of the cost, stability and performance of orchestration solution based on Apache Airflow running in their organisation.
How are users affected by the change? (e.g. DB upgrade required?)
The users are provided with additional information in the release notes that may be meaningful to their setup and new versions adoption.
Other considerations?
What defines this AIP as "done"?
This is done once the first release of Apache Airflow includes performance measurement results.
9 Comments
Hussein Awala
I like the idea.
IMHO, If we want to move forward with this proposal, we need to focus on Kubernetes deploying the cluster for the different test cases with helm charts, for different reasons:
There are some questions that need to be answered:
Jens Scheffler
I also very much like the idea and in my past years of solutionizing I had to execute load tests may times. Also I can tell the story that in recent upgrades we had multiple times pitfalls in performance. Therefore we also have some DAGs currently implemented to test a performance regression when we upgrade our environment.
Like Hussein Awala I would also propose to leverage a K8s deployment via the most recent helm chart. Size of machines and number of workers, memory limits etc. shall be defined once in a baseline. Also such K8s deployment would cover most of the setups "in the wild". But still there are many combinations and options that can have an influence on setup, so it might need to the fact that some combinations need to be tested (e.g. Celery vs. K8sExecutor?). But I would favor making it once in general and then have it evolving over time. For sure demand and details need to change, some metrics might need to stay stable, other things probably needs to be adjusted.
I also would propose to have the performance suite set of DAGs and scripts/configuration distributed such that it can be used as an regression in individual setups as comparison or for performance troubleshooting. I would propose to include this in the System test suite. Really if Google/AWS/Azure would sponsor a test environment, would be also great to see how the environments behave different with the same profile .
Things that I have seen with pitfalls in the past that I'd like to have included/propose to include are:
Overall I would support this initiative and am happy to guide to review with experience! THANKS!
Bartosz Jankiewicz
It seems to be a valid expectation to run the framework with K8s. I have added such a requirement explicitely.
Regarding adding a set of DAGs to the framework - I'm happy to include preconfigured performance DAGs in the framework.
In respect to your previous observations - running DAGs multiple times to collect more representative data has been included in the proposal. Please see `attempts` parameter. Also important to run the test for a longer moment with larger number of DAGs/tasks to collect stable statistics.
We need to think about some tests covering Dynamic tasks mapping. That problem has been brought up in this discussion. I think it can be added as a performance DAG parameter - e.g. PERF_DYNAMIC_TASKS_MAPPED to represent the number of tasks to be induced.
Parsing is a different challenge. I think that generating thousands of DAG files with the framework could serve a purpose of a simple parser testing. This is already possible with the proposed framework. The DAGs don't need to be complex as we are not testing Python interpreter but the file processing mechanism in DAG processor.
Amogh Desai
Thanks for the insightful proposal, Bartosz Jankiewicz
I agree with Hussein Awala's suggestion to use Kubernetes (K8s) deployment. It'll save time in customizing our performance suite, letting us focus more on developing the test suite and solutions.
Also, running the suite at midnight sounds good for getting enough results. Are we considering tools like Prometheus + Grafana for better result visualization?
Jarek Potiuk
I guess reminding on the devlist and possibly just putting it up to a vote next should be the next step. I don't expect much controversy here.
Jarek Potiuk
Also, there is an overlap with AIP-64: Keep TaskInstance try history. where testing regressions after DAG versioning is implemented is an important aspect.
Kaxil Naik
Bartosz Jankiewicz How is this progressing?
Bartosz Jankiewicz
I'm sharing it for internal reviews with my colleagues and open a PR this month
Rahul Vats
Thanks, Bartosz Jankiewicz. Can you share the PR when it's done? I would love to try it out.