DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Motivation
What problem does it solve? / Why is it needed?
With Airflow 3, task execution got separated from Core Airflow services and lost direct access to the Airflow metadata database for good. Below diagram loosely represent the current Airflow architecture (with K8sExecutors), where the task execution pods are updating there status via APIs calls. To make the API the execution pods needs ingress access to Airflow's API-server as well a authentication token which gets generated by Scheduler while creating the task payload.
This change brings a big improvement in Airflow's overall security aspects, and makes it safe from accidental DB change. However, there are still other services running in Airflow's control plane where a user written code can access metadata DB, though it has to be intentional, cause unintentional accesses will be blocked programmatically. This also poses a bigger risks to the setups which are multi-tenant in natures and core services are managed by a central team which have no control over user's code. The complete safety can be ensure, only if there is no chance that any user written code could access metadata DB, intentionally or unintentionally.
Below diagram shows an idle state where all the user written code is interacting with core services via APIs only. The APIs are broken down into different web-services (optional) for the overall safety and scalability.
As there are too many places where the direct DB access can be obtained by user code, we've to break down this AIP in 3 phases:
Phase 1: Making DAG Processor compatible with APIs
In this phase we'll create the required APIs (listed below) and keep a flag to let users decide if they want to use the existing DB based approach or API based approach while running DAG processor. In this phase we'll keep the Callbacks part of the DAG processor only, though they would also needs to be working over APIs (if the flag is true). So in nutshell DAG processor and Callbacks would be tightly coupled.
Phase 2: Take out Callback processing out from DAG processor
In this phase, we'll make the API way is the only way for DAG processor and provide an option to users to run the Callback processor separately via a feature flag. We would also create cli command to run Callback processor in standalone mode.
Phase 2.5: Take out option to run Callback processor as part of DAG processor.
Phase 3: Make Triggerer also working via APIs only
In the final phase, we would create required APIs and expose them so Triggerer won't be needed a direct DB access for it's workings.
We also going to need an option for users to run all these APIs as separate services (aka Web, Execution, Internal) or mix and match, i.e. an user can choose to run some of these APIs as part of the same service or have to run them separately. For large enterprise Airflow cluster, running them separately makes more sense, as each service can be deployed individually, can have network policy separations, decide on machine types etc, while for smaller Airflow setups this would be too much of an overhead. So I believe that there should be enough options to serve both the needs.
Considerations
What change do you propose to make?
Phase 1
From the very beginning Airflow relied on DAG parsing loop continuously re-parsing and maintaining the state of DAGs in Airflow deployment. This simple approach served Airflow for a long time, but there is an issue associated with this approach and that is providing read/write access to Airflow's metadata to the process which is running user's code (DAGs).
DAG processor used to part of Airflow scheduler only, but it got carved out as a separate CLI command recently. However, below code runs as part of DAG processor and updates the serialised dags directly into the DB. This requires access to DB creds, wherever this dag-processor is running, which undermines the overall security aspects of Airflow and some benefits of Task execution isolation which got released as part of Airflow 3.
With Airflow 3, there have been a safeguard implemented, which prevents DAGs accessing underlying metadata DB, but that can be bypassed with some hacks. Also, in an environment where Airflow management team is different from the DAG authoring teams, it poses a security risk.
As part of AIP-85 Extendable DAG parsing controls, there is a separate work going on to make the DAG processor much more extendable, but that is solving a different problem altogether and whatever we do here should be compatible with AIP-85 as well.
I can think of a exposing new API namespace (say internal), which can be used by the DAG processors to save the changes it found while processing the DAGs
- GET /internal_api/dag_processor/dags?dir=path_to_dag_dir
- POST /internal_api/dag_processor/update
- GET /internal_api/callbacks?dir=path_to_dag_dir
- DELETE /internal_api/callback?id=?
The first API will provide the list of DAGs and other info like when it was parsed last time etc, so the DAG processor can take the call to parse it again or not. The second API would be use to update the serialized DAG information to the Airflow DB. The third GET api will be used to fetch list of callbacks associated with all the DAGs found under the dag folder and finally a fourth API to delete the callbacks which have been processed successfully. The first three APIs are idempotent and it would be fine to retry them, but the forth one would need more handling and safe-guarding, cause it can cause a callback to run multiple times.
Also, these APIs can be make to work on bundle level rather than folder level, but I don't have any strong opinions as of now. If we go ahead with bundle level, then basically each customer's DAG folder should be classified as a different DAG bundle. We also need to figure out a long lived token based authentication mechanism, as these processes needs a way to authenticate themselves with the API server.
Phase 2
Callbacks are primarily of two types, DagCallbackRequests and TaskCallbackRequests. These callbacks get stored into the DB in table named "callback_request", which stores the callback data into a JSON column and also polymorphically stores whether it's Dag callback or a Task callback.
Below is a GenAI generated walk through of how the callback are getting generated and being stored in the DB.
And this another GenAI graph shows from where these callbacks are being generated. These callbacks can be generated within Scheduler or Trigger code, which used direct connection to DB to save them, OR in the execution flow, where they gets transmitted as TI events to execution APIs and gets stored into the DB [As per my understanding, need confirmation].
Later, these callbacks fetched in the DAG processor code and gets executed and deleted as well. I believe there are also minor chances that a callback doesn't get executed due to some infra issues (say node restart). I.e. the DAG processor kept it in-memory queue for processing and deleted the DB entry, but later got shutdown before really executing it [Citation needed].
So to decouple Callback processor from the core-services, we would need to make sure that storing and fetching of callbacks can happen over APIs, which we've mentioned in the Phase 1 already. The users would have to make sure that their control plane and execution plane services can reach to these apis.
- GET /internal_api/callbacks?dir=path_to_dag_dir
- DELETE /internal_api/callback?id=?
As a next step, we would be taking out the Callback processor as a separate cli command, something of this sort
airflow callback-processor
This would be creating and running a standalone callback processor, which is fetching the list of callbacks from the internal API server and executing them and deleting them (via issuing a delete API).
Phase 3
In the final phase, we would be targeting Triggerer svc to not using direct DB operations and instead go through via API route. As of now, Triggerer makes use of multiple DB calls for it's working, which are listed below (may not be complete list) and to make Triggerer free from DB access, we would've to create APIs equivalent for all these methods (some APIs might exists already)
- bulk_fetch()
- fetch_trigger_ids_with_non_task_associations()
- clean_unused()
- submit_event()
- submit_failure()
- ids_for_triggerer()
- assign_unassigned()
- get_sorted_triggers()
- handle_event_submit() (both versions)
- _submit_callback_if_necessary()
- _push_xcoms_if_necessary()
Path to implementation
The implementation for Phase 1 would be rather simple and will be behind a flag, which would control whether to use the current approach of directly updating DAGs into the DB or send them over the API.
1. Call the GET api to fetch a list of DAGs and associated metadata
At the start of the DAG processor loop, it'll call GET api by providing the path/dir it was supposed to process and fetch a list of DAGs and all the associated metadata. This metadata would contain basic info along with last parsed time etc as well, so the DAG processor can take a call whether to process the same file in this loop or not.
2. Iterate over files and collect DAGs
Now DAG processor iterates over files the way it does now, and collects the DAG and serializes them in JSON. It'll also run any callback it finds while processing the DAGs.
3. Call the POST api to save the serialized DAGs
At last, it'll make an api call to the Airflow web service and update the DAG JSON in the DB, so it can be used by the Scheduler and Web server for usual purposes.
4. GET list of callbacks, execute & delete them
Iterate over all the registered callbacks from the DB and execute them. Also, keep calling the DELETE callback api to clear the callbacks from the Airflow DB.
Authentication
However, we've to figure out how Dag-processor will be able to authenticate itself to the web server. During task execution, the Scheduler shares pre-created short-lived JWT tokens when launching the job, thereby allowing access to API endpoints. But in the case of Dag-processor, we've to figure that out. I believe that there are 2 possible ways -
- Sharing of JWT secret with Dag-processor, so it can generate JWT tokens by itself and is able to hit the API endpoints. The dag-processor can create short-lived tokens and refresh them before expiry. This approach is easy, but have a big security risk, which is to share the JWT secret to the execution side. I mean, we are assuming that Dag-processor would run closer to the execution plane, rather than the control plane. So someone can make use of this secret to submit bad data and can submit DAGs that kind of do a DDOS for other genuine DAGs. However, this is kind of possible today as well, where one can write a Python file and generate millions of DAGs running every second.
- Extend Dag-processor from the workloads, something in lines of task workloads. What does it mean that Scheduler only launches Dag-processor as some kind of system job, whenever it feels the need to do that for any reason, like DAGs are not fresh anymore, or a new dag_bundle got added, etc. When the scheduler launches the Dag-processor, it can provide a JWT token, the same way it does for task executions, and things would work ina similar fashion. This approach is good in terms of security, but may need a big rewrite to achieve.
API Versioning
As we are separating Dag-prcoessor out of Scheduler and it will be calling APIs instead, we've to make sure that the client (dag-processor) and server (api-server) never break the contract and are fully compatible. I think we can make use of versioned paths in APIs. I mean we can start with /api/internal/v1/ namespace and whenever there is a breaking change, we bump the version, thus the old clients still submit payload to the previous versioned URL. However, if the client upgrades before the server, then it might break the contract. Also, if there are some DB schema changes, different API versions won't be able to work.
Packaging of DAG-Processor
As Dag-processor would be moving to the execution plane from the control plane, and would be using Airflow Task SDK for it's working, it makes sense to move out Dag-processor from Airflow-core to Task-sdk package itself or a separate package altogether.
Implementation details for Phase 2 and 3 can be discussed later on.
Performance
There could be some issues while sending or receiving the large payloads, so we may have to implement pagination while receiving the data and or sending the data. The actual limitations would surface once we start testing the feature and hit the limits. Also, there would be some extra network load due to fetching and deleting callbacks. Another potential bottleneck could appear due to deserializing large JSON payload at the API server side, as Python isn't very great in doing that. So one optimization we can do that the API server don't deserialize or examine the payload it's receiving and blindly stores into the DB. The payload would be a string (compressed & encoded JSON), which would be converted into JSON only when it's needed later on.
i.e. DAG Processor reads dag code, converts the resulting JSON into a plain string by compressing and encoding it and sends it to internal API to store. The API just saves it in DB, without doing any processing or deserializing. Later when Scheduler or Web-server needs it, they do the deserializing on the fly.
To experiment with the above code, I took a dump of 10k DAG's serialized json data (completely random), and ran the above code to check the results and it was able to compress 44MB of json text in just 3.5MB text. Of course modern databases already have optimizations and compression enabled while storing a JSON column, but this would help us in API server side. However, this would take away the capabilities to directly query json fields from the DB.
>>> f = open('/Users/sumitm/Downloads/json_dump')
>>> json_data = f.read()
>>> sys.getsizeof(json_data)
45800102
>>> compressed_data = compress_json(json_data)
>>> sys.getsizeof(compressed_data)
3606773
>>> reconstructed_json = decompress_json(compressed_data)
>>> sys.getsizeof(reconstructed_json)
45800102
>>> reconstructed_json == json_data
TrueCompatibility / Which users are affected by the change?
Ideally there would be no change visible to the users, assuming they aren't connecting to Airflow DB while in their DAGs. There would be DAG import errors will be thrown for DAGs which are connecting to DB, but that should not be an issue, cause with Airflow 3, there is already a check in place which does exactly the same, i.e. throws error is a DAG tries to connect to DB. However, with this change, we won't be just relying on a piece of code, instead isolating the DAG processor in an env, which doesn't even have DB creds details.
Maintainability
The APIs needs to be updated if there are any changes in the way DAGs gets stored in the DB. The APIs and calling code would needed to be changed accordingly.
Execution dependencies
Nothing I can think of.
Are there any downsides to this change?
The only downside is possible, which is adding dependency of a healthy web server while serializing the DAGs. However, with Airflow3, task executions also relies on the same dependency, so it should be fine, cause if execution are blocked to the web-server being down, no point in parsing the DAGs as well.
How are users affected by the change? (e.g. DB upgrade required?)
The changes should be compatible with the next Airflow release and apart from changing a flag, users won't need to do anything extra. If we choose to change the way we saves serialized dags data in DB, we might end up creating another column for that.
What is the level of migration effort (manual and automated) needed for the users to adapt to the breaking changes? (especially in context of Airflow 3)
Once we are done with all the phases, users would be required to pay more attention to the their Airflow infra setup. I mean how they want to different API servers or where they want to place DAG processor and Callback processor would matter.
What defines this AIP as "done"?
Once we are done with all the three phases and DAG processor, Callback processor and Triggerer service don't need direct DB access to run.







