Background

Original document (archived now) is in AIP-15 Scalable Scheduler [ARCHIVED]


The purpose of this document is to serve as a functional and high-level technical design for the Airflow Scheduler high-availability effort. 

Airflow has excellent support for task execution ranging from the basic Local Executor for running tasks on the local machine, to Celery-based distributed execution on a dedicated set of nodes, to Kubernetes-based distributed execution on an as-needed, dynamically scalable set of nodes. The Airflow scheduler reads DAGs, analyses task dependencies, schedules and tracks task execution across these distributed nodes, handles task failures with retries as appropriate, and reports errors. 

As a result, task execution is now significantly scalable within Airflow with the improvement in executors. However, in the evolution, the Airflow Scheduler itself has become a bottleneck for future scalability and worryingly the ‘single point of failure’ which is an anti-pattern in today's computing architectures. 

It is clear that the Airflow Scheduler does far more than ‘just scheduling’ and is analogous to a hypervisor, but the naming discussion is a topic for another time.

Motivation

By default, users launch one scheduler instance for Airflow. This brings up a few concerns, including

  • High Availability: what if the single scheduler is down.
  • Scheduling Performance: the scheduling latency for each DAG may be long if there are many DAGs.

In today’s world, the single point of failure does come up as a blocking issue in some users' adoption of Airflow. This is a well-known limitation with Airflow today and some SIs have written blogs about how enterprises could address this in their environments. For example, here is a link from Clairvoyant about making Airflow highly available.

It would be ideal for Airflow to support multiple schedulers, to address these concerns.

There was a "hacky" method to start multiple schedulers and let each handle a specific set of DAGs. It does improve scheduling performance, but doesn't address HA concern, and was never "officially" supported by Airflow, and required manual book-keeping.

Another issue that has started coming up with AI-centric users is the need for massively parallel task execution - a recent example of this required thousands of DAG runs to be initiated within a short time frame and run in parallel as part of an ML pipeline, and Airflow didn't really cope and the performance (i.e. time to schedule tasks) wasn't acceptable.

There are also still occasional reports of the scheduler "stalling" and no longer scheduling tasks.

Goals

Based on the above, the goal of this project is two-fold:

  1. Make the Airflow Scheduler highly available to address reliability concerns, and 
  2. Make the Airflow Scheduler scalable beyond the current single-node, multi-thread/process model to serve the increased scalability demands of users.

Approach

The high-level approach outlined below is intended to be contained within Airflow and infrastructure-neutral, since it needs to work in the vast majority of enterprise and cloud deployments without needing infrastructure components such as Kubernetes. 

The high level design approach is to leverage a 'active / active' model using the existing shared database for locking and a not require any direct communication between schedulers. Specifically, this entails the following enhancements to Airflow:

  • Multiple schedulers can now be run within Airflow. For a high availability situation (any production deployment), this should be at least two, three is better.
  • The running schedulers will all use the same shared relational database (the 'metadata database') and regularly heartbeat their own availability so that external monitoring systems can perform health checks easily. This 'external' data availability will be performed by making 'operational data' available through the existing statsd interface. 
  • Each scheduler will be fully 'active'.  This includes 'non-scheduling' work as well such as task execution monitoring, task error handling, sending task emails, and clean-up. 
  • The configuration for all the schedulers will be identical. Differing configurations for different scheduler instances will NOT be supported. This is actually incomplete at this time, since it is intended to cover 'application configuration', not necessarily 'infrastructure configuration' detailing where these scheduler instances could be run. There are no current plans to ensure each scheduler is configured the same
  • This approach relies on DAG serialization becoming an integral part of the scheduler. In practice, this means the following:
    • As part of DAG file parsing, the DAG file will be parsed and serialized sufficiently that the serialized version can be used for scheduling. This serialized version will be stored in the database. 
    • To reduce lock contention in this process, each scheduler will sort the DAGs to be processed in a random order before starting the parsing steps.
  • The scheduler will use the database as the 'shared queue' for operations to be performed and lock at the appropriate object. 
    • The scheduler 'shared queue' will probably be at the 'DAG run' level. This needs to be validated during the detailed design, since this may be 'necessary, but not sufficient', since certain operations such as 'managing task SLAs' may require 'task-level locking' though it seems possible to refactor those into the 'DAG' level locking model.
    • In practice, this means that one of the schedulers picks 'the next DAG run' in the list and 'locks it' in the database to start working on it. While it is working on it, it periodically updates the lock as a heartbeat that it is alive and working on it. 
    • The current thinking is to use the database locking by SELECT FOR UPDATE  on this row as the locking mechanism as opposed to using a separate field in the table for the lock. If a scheduler dies while working on a DAG, the lock would be expired by the database. This needs to be validated and optimized since there have been reports of slowdowns observed while trialing this approach.
  • The workers will be enhanced to implement a fast-follow mechanism to reduce the activity of the scheduler. Specifically, this includes:
    • Workers will look for the next task in the same DAG to be 'ready' at the point of task completion and if it is ready, put the next task into the QUEUED state. 
    • This would reduce the scheduler activity for follow-on task execution, but the communication between the worker and the executor about the identification of the 'next task ready' and 'task being worked on' needs to be clarified. 
    • A config flag which triggers this option may be of value, since this does change the existing behavior and has a possibility of 'DAG starvation'.

More sophisticated approaches can be layered onto this approach for specific cloud / infrastructure capabilities thereby increasing both scalability and the handling of infrastructure failures.


The probability of schedulers competing on the same DAG is easy to calculate since it's a typical Birthday Problem, and it is reasonably low if # of DAGs/ # of schedulers is not too low (the probability that there are schedulers competing on the same DAG is 1-m!/((m-n)! * (m^n))  , m is the number of DAGs and n is the number of schedulers).

Let’s say we have 200 DAGs and we start 2 schedulers. At any moment, the probability that there is schedulers competing on the same DAG is only 0.5%. If we run 2 schedulers against 300 DAGs, this probability is only 0.33%.(https://lists.apache.org/thread.html/389287b628786c6144c0b8e6abf74a040890cd9410a5abe6e968eb55@%3Cdev.airflow.apache.org%3E)

There is currently a scheduler_lock  column on the DagModel table, but it's not used in current implementation of Airflow (as of now, end of Feb 2020). If the SELECT FOR UPDATE approach doesn't work this may be a suitable fallback.

Acceptance Criteria

The following test criteria need to be met:

  1. The simplest test of the high availability criteria is to: 
    1. Start multiple schedulers with a set of long running DAGs, 
    2. Externally kill one of the schedulers during the processing of a DAG, 
    3. Validate that all the DAG runs are successfully executed to completion.
  2. The same test as the above, but with a modification such that one of the schedulers is run in with a different machine 'localtime' timezone to the others (The default_timezone in airflow.cfg must be the same across all schedulers.). If this is not supportable in the initial release, this should be explicitly noted and ideally prevented from the configuration.
  3. The scalability test criteria is more difficult to define upfront, but benchmarks need to be published before and after about the volume of tasks which can be launched and managed by a single scheduler instance at this time and how this can be scaled with a 'multiple active' model.


Known Limitations

The approach detailed above makes Airflow itself a more robust and scalable platform. However, it does not leverage scalability to a general multi-region set of clusters and with a true 'quorum-style' capabilities for distributed processing at scale without a 'shared source of truth' such as an ACID compliant database like PostgreSQL (or MySQL). 

It is possible to extend the above approach towards a multi-region, shared-nothing approach from a software perspective, or to potentially leverage a distributed ACID database for the same from an infrastructure perspective, but at this time, those have not been seriously considered, as we believe we can achieve the goals without the extra complexity these technologies introduce.

Deployment Models

  1. Airflow scheduler HA deployment in a set of Virtual Machines (no Kubernetes)


  2. Airflow scheduler HA deployment in a Kubernetes environment

Flow diagram

Updated flow diagram of the scheduler with respect to DAG parsing:

To avoid the "contention" between schedulers, we may want to consider random sort list of DAG files before it's passed to scheduler process (https://lists.apache.org/thread.html/e21d028944092b588295112acb9a3e203c4aea7fae50978f288c2af1@%3Cdev.airflow.apache.org%3E)

Another method to avoid schedulers competing with each other is to let scheduler look select the DAG that's not been processed for the longest time that is not locked (https://lists.apache.org/thread.html/6021f5f8324dd7e7790b0b1903e3034d2325e21feba5aef15084eb17@%3Cdev.airflow.apache.org%3E).

Elements considered and rejected or deferred

This section is primarily for completeness and contains elements considered in the technical approach and rejected during the high level design process.

  • Staring up and restarting of schedulers when they fail will be left out of Airflow and will be handled by the managing infrastructure. There the following elements are now discarded: 
    • This includes a configuration change to specify the number of schedulers to be run. 
    • The running schedulers will register themselves within the shared PostgreSQL (or other) database and regularly heartbeat their own availability. When this falls below that number, a new scheduler will be started. 
    • Each scheduler will be fully 'active', including the ability to 'launch' a complete new scheduler if the active count in the database is below the configured value.
  • Scheduler locking DAG runs using an additional field in the database and dealing with lock expiration as below. 
    • If a scheduler dies while working on a DAG, the lock would be expired by one of the schedulers as they look for the next DAG to work on. It is probably best to use the same approach of '2n + 1' for lock expiration similar to task heartbeat. A cross-check for the same is with the scheduler registry (and heartbeat) discussed above. It is arguable that only of these is needed and not both, but that can be nailed down during the detailed design and implementation

21 Comments

  1. Hi, this proposal is awesome. 

    I'd like add some comments on it: 

    1. The link to DagModel's scheduler_lock in (1) is https://github.com/apache/airflow/blob/45d24e79eab98589b1b0509e920811cbf778048b/airflow/models/__init__.py#L2791
    2. According to three principles of HA in https://en.wikipedia.org/wiki/High_availability#Principles, the second principle `Reliable crossover`. Because all scheduler are running on same source code, I am wondering if current proposal is reliable enough. Or what kind of failure such structure could deal with? I guess it can handle any issues from DAGs, but cannot handle edge cases in schedule itself. 
    3. The 1) 4) 5) are all about schedule lock. Could you put them together and propose a potential lock mechanism and how handle any edge cases such as what if the scheduler hung for a long while and holds the lock preventing the task reran?
    4. As for performance, scheduler relies on the lock in DB. It seems the performance also depends on DB performance of processing transactions. 


    1. Thanks Chen Tong

      Regarding your 2nd point: the schedulers are going to run on different machine or pods.

      Regarding your 3rd point: Yes I'm working on it.

      Regarding your 4th point: Fully agree. DB is one of the potential bottlenecks we may have when we try to scale up our Airflow clusters. We need to see & test how big the impact from multi-schedulers will be on DB.

  2. I personally would prefer a leader election and the leader handing out follower counts & order for hashing on the dag filepaths instead of having each scheduler checking the lock on each dag filepath encountered.
    Consider there's F dag files and S schedulers. With the current proposal it sounds like there will be F*S lock checks on the DB which increases with both F or S. While I think one reason I prefer each scheduler being told to work only on F/S dag files, it would limit those DB checks to approximately grow with F, with edge cases for a scheduler leaving, possibly when joining. A second reason is that, as far as I can see, the scheduler_lock is in a table where the primary key is something you only know AFTER parsing the dag file anyway, meaning you're not saving the parsing time by scaling up S. Oh I see that's covered in 5b.

    1. >>> "I personally would prefer a leader election and the leader handing out follower counts & order for hashing on the dag filepaths instead of having each scheduler checking the lock on each dag filepath encountered."

      Earlier I was planning in this way. But I think: 1. handling leader-selection would make it unnecessarily complicated; 2. assigning a set of dag filepaths to each scheduler may cause more edge cases (especially when I'm trying to add more schedulers or reduce my number of schedulers without downtime).

      1. I agree with Xiaodong DENG on that.

        I think Leader selection might be complex to get right and maintain and it is quite not necessary because we already have the single synchronisation point that we can use (the database).

        It is less scalable solution in general (of course) but it is way simpler and good enough for the Airflow use case. It is much simpler and robust to use DB locking in the current architecture.

        Leader selection might make more sense when (if) we get rid of the central database and use some other - more distributed and scalable - state storage. But I think for pretty much all deployments of Airflow, database is not the limiting factor when it comes to scalability. Main reason is that scanning/parsing of DAGs and processing tasks in Airflow is few orders of magnitude slower than database operations that are needed.

      2. I would think we need both.

        Raft libraries are out there, they can coordinate with the existing DB on their own table, you could roll your own coordinating on the DB too, but it'd probably be buggy which is why I agree that you should initially think "no thanks" but, when you consider the algorithm is well enough documented and implemented in several available options, you might consider first what you're giving up.

        By its nature it handles adding and removing schedulers. You just renumber the followers when adding or removing schedulers, and they wait for the leader to do that and signal the end of a pass over the DAG_FOLDER, which would be easy to sync with elections.
        mod(hash('$DAG_FOLDER/groupA/dag1.py'), scheduler_count) == my_scheduler_number seems to better distribute the work to the schedulers instead of the DB. I still recommend the DB model's lock be checked, and we index on the file etc, I just think that I'd rather have the load on the DB regarding that check not go up with a factor of the number of schedulers needing to check locks.

        1. I discussed it with my colleague Kamil Bregula and he is also in favour of some consensus algorithms. I am not agains it, but I think one important aspect here that we should consider is deployment complexity and operations cost.

          I think that whatever solution we come up with, we should think that someone will have to configure and run the whole deployment. One of important aspects is security of such deployment and authentication of communication parties. Not sure how Raft works in detail, and whether it involves direct communication between various schedulers (but I imagine it does by it's nature as there is no single central place). So if we are talking about celery deployment for example, there must be some way to authenticate schedulers between each other so that we know it's not a rogue "node". That should involve key generation, mTLS  (mutual TLS) - in general it might become a pain to configure and maintain.

          In the simple solution that we have now with single database, it's just a matter of configuring connection to the database with user/password (and each scheduler will have this single user/password). In case of distributed consensus, I believe it will be quite a bit more complex to setup. Since we want to have something working also with Celery etc. that might become fairly complex task as a deployment exercise in real "production" case.

          This is where the power of things like Kubernetes + Istio  - there all such problems are already addressed by the platform itself, not only distributing workload and scalability but also mutual authentication of connecting parties - then we can focus just on algorithm of consensus (smile). Maybe some day this will become the default/only platform for Airflow (but by then consensus algorithm will likely be part of the platform already as well and then we will just "use it".

          1. We do not need Raft or any special leader election. Raft is for distributed consensus. Doing leader election is pretty trivial in a non distributed fashion. (first person to lock the row wins, and use heartbeat). In fact, the current proposal is effectively choosing a leader on each queued item by acquiring a lock. 

      3. We do not need a distributed leader election, we just need to do it on 1 host (MySQL). This is as simple as first to acquire a lock on the row wins.

  3. Does all the scheduler co-deploy in the same boxes or different boxes? I assume it will deploy scheduler on different boxes as we won't call it HA.  I don't see any explanation on why we want to avoid leader elections. I think this AIP needs a diagram for the architecture it tries to propose as many questions are still unanswered. E.g 1. different schedulers for different dagBag locations? ; 2. different schedulers for same dagbag  but deployed on different boxes?; 3. different scheduler for same dagbag on same box?


    The motivation needs more wording to explain what it means by "down"? Does it mean is scheduler process get killed or is the scheduler box down? For the first case, it is pretty common as most deployment will run scheduler in a fix time interval which will get re-initiated by supervisor/ runit.


    And how does AIP work with Deprecated- AIP-5 DagFetcher if the DAGBag location could be any arbitrary filesystem?

    1. Hi Tao Feng, may I know what you mean by "box" in this scenario? What I meant here is to have the schedulers to run on different machine or pod (in K8S scenario) so we have HA.

      For other questions, please allow me more time to get back to you.

    2. > is the scheduler box down

      Such down is out of scheduler's control. Right? e.g. the scheduler is running in a pod(box in K8S), the pod could be recreated by K8S. 


      I am also curious what "down" issues we are addressing here.


      1. I don't think we can assume K8S/supervisor/runit can help us take care of everything.

        Let's say we are using a few VMs to deploy the Airflow cluster. I have my webserver, scheduler_1, scheduelr_2, and workers etc. running on different VMs (sharing the same DAGs). It's always possible that one single VM may go down, or lose network connection, or stop working due to some random reason and not recovered automatically.

        Please let me know if this clarifies? Always open to discussion (smile)

  4. I recommend using temporal chaining in the database to achieve optimistic locking (concurrency control) and extensive auditability. In my own experience, a well-designed temporal chaining ORM will ensure atomic operations without placing any locks on the database.

    I have seen massively horizontal worker systems implement functionality very similar to Airflow's using only temporal chaining and optimistic locking:

    Description below (source):

    Optimistic concurrency control

    Optimistic concurrency control (or “optimistic locking”) is usually implemented as an application-side method for handling concurrency, often by object relational mapping tools like Hibernate.

    In this scheme, all tables have a version column or last-updated timestamp, and all updates have an extra WHERE clause entry that checks to make sure the version column hasn’t changed since the row was read. The application checks to see if any rows were affected by the UPDATE and if none were affected, treats it as an error and aborts the transaction.

    For this demo you need to add a new column:

    ALTER TABLE accounts ADD COLUMN version integer NOT NULL DEFAULT 1;
    

    Then the example above becomes:

    SESSION 1SESSION 2
    BEGIN;BEGIN;
    SELECT balance, version FROM accounts WHERE user_id = 1; (returns 1, 300)

    SELECT version, balance FROM accounts WHERE user_id = 1; (also returns 1, 300)
    COMMIT;COMMIT;
    BEGIN;BEGIN;
    UPDATE accounts SET balance = 200, version = 2 WHERE user_id = 1 AND version = 1; (300 – 100 = 200. Succeeds, reporting 1 row changed.)

    UPDATE accounts SET balance = 200, version = 2 WHERE user_id = 1 AND version = 1; (300 – 100 = 200). Blocks on session 1’s lock.
    COMMIT;

    (UPDATE returns, matching zero rows because it sees version=2 in the WHERE clause)

    ROLLBACK; because of error detected

    Because it’s fiddly to code, optimistic concurrency control is usually used via an ORM or query building tool.

    Unlike SERIALIZABLE isolation, it works even in autocommit mode or if the statements are in separate transactions. For this reason it’s often a good choice for web applications that might have very long user “think time” pauses or where clients might just vanish mid-session, as it doesn’t need long-running transactions that can cause performance problems.

    Optimistic concurrency control can co-exist with traditional locking based approaches if you use triggers to enforce the optimistic concurrency locking rules; like this. Nonetheless it’s generally used instead of other explicit concurrency control methods.

    1. Optimistic concurrency control might be a good idea. The problem is that is not how it is currently implemented.

  5. The probability calculation assumes the parse times are uniform, which they are not. 

      1. I left those calculations over from the previous AIP before we edited it, but in practice I don't think it even matters with the approach I am planning on taking, although I haven't explicitly stated so in the AIP as I need to see how it behaves on mysql, but the plan is to use SELECT ... FOR UPDATE NOWAIT (or SKIP LOCKED) – that is if 1. useful enough versions of mysql support it, and 2. if it actually works as documented https://dev.mysql.com/doc/refman/8.0/en/innodb-locking-reads.html

  6. Ash Berlin-Taylor Kaxil Naik

    What is the current state of this AIP? If you are planning to do any other work, could I ask you to create **one main issue**, which will allow us to track the progress of work. When we're done, we can close the ticket. All other AIPs have a ticket like this, and I think we should stick to it.