Status

StateAccepted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/0gokmfdjwr531t4wxfdb6qhdrrq4sxcy
Vote Result Threadhttps://lists.apache.org/thread/hcdb8tb2l09kjho5mtw4gy3mk31g84ox
Progress Tacking (PR/GitHub Project/Issue Label)
Created2024-06-14
Version Released
Authors

Motivation

Historically, Airflow’s task execution context has been oriented around local execution within a relatively trusted networking cluster. 

This includes:

  • the interaction between the Executor and the process of launching a task on Airflow Workers, 
  • the interaction between the Workers and the Airflow meta-database for connection and environment information as part of initial task startup, 
  • the interaction between the Airflow Workers and the rest of Airflow for heartbeat information, and so on.

This has been accomplished by colocating all of the Airflow task execution code with the user task code in the same container and process.

For Airflow users at scale i.e. supporting multiple data teams, this has posed many operational challenges: 

  • Dependency conflicts for administrators supporting data teams using different versions of providers, libraries, or python packages
  • Security challenge in the running of customer-defined code (task code within the DAGs) for multiple customers within the same operating environment and service accounts
  • Scalability of Airflow since one of the core Airflow scalability limitations has been the number of concurrent database connections supported by the underlying database instance. To alleviate this problem, we have consistently, as an Airflow community, recommended the use of PgBouncer for connection pooling, as part of an Airflow deployment.
  • Operational issues caused by unintentional reliance on internal Airflow constructs within the DAG/Task code, which only and unexpectedly show up as part of Airflow production operations, coincidentally with, but not limited to upgrades and migrations. 
  • Operational management based on the above for Airflow platform teams at scale, because different data teams naturally operate at different velocities. Attempting to support these different teams with a common Airflow environment is unnecessarily challenging.

The internal API to reduce the need for interaction between the Airflow Workers and the meta-database is a big and necessary step forward. However, it doesn’t fully address the above challenges. The proposal below builds on the internal API proposal and goes significantly further to not only address these challenges above, but also enable the following key use cases:

  1. Ensure that this interface reduces the interaction between the code running within the Task and the rest of Airflow. This is to address unintended ripple effects from core Airflow changes which has caused numerous Airflow upgrade issues, because Task (i.e. DAG) code relied on Core Airflow abstractions. This has been a common problem pointed out by numerous Airflow users including early adopters. This proposal would enable Airflow users to upgrade Airflow system components (Scheduler, et al), without impacting DAG user code.
  2. Enable quick, performant execution of tasks on local, trusted networks, without requiring the Airflow workers / tasks to connect to the Airflow database to obtain all the information required for task startup, 
  3. Enable remote execution of Airflow tasks across network boundaries, by establishing a clean interface for Airflow workers on remote networks to be able to connect back to a central Airflow service to access all information needed for task execution. This is foundational work for remote execution.
  4. Enable a clean language agnostic interface for task execution, with support for multiple language bindings, so that Airflow tasks can be written in languages beyond Python. For now, we are only proposing allowing tasks in other languages, DAGs themselves must still be python based.

Considerations

What change do you propose to make?

At a high level we propose the following changes:

  • To completely disable direct database access from all tasks and workers, an HTTP-based API being the only option
  • To build/extend on AIP-44 (internal AIP) but start from the ground up with a custom designed API that does exactly what we want only, and does not need to worry about code level compat
  • Define a "Task SDK" that provides a greatly reduced and slim-line interface/API of what code Tasks can call
  • To improve the security of task execution by optionally requiring all Variables and Connections a task uses to be "pre-declared" in the DAG code (with a mode that allows tasks to pull connections with access control policies)

One of the key properties to maintain with this change is that No extra components outside of Airflow and it's metadata DB should be needed to run Airflow locally

Disable Direct DB access

To evolve our security posture we should remove direct DB access from all "user" code entirely (user code defined as DAG files, but not necessarily plugins). This was one of the modes in AIP-44, but in Airflow 3 direct DB access from workers will not be allowed at all.

This will be a breaking change for some users, especially those with custom operators, but it is a net benefit to the Airflow project and worth the pain.

By having all "DB" traffic go through an API server it also means the number of open connections to the DB server will be greatly reduced, especially for large deployments.

Triggerers and triggers will also not be allowed to have direct DB access as Triggers are user code.

Build a new API for task communication from the ground up

With Airflow 3.0 we have more options to introduce breaking changes, so we should design a new purpose build API.

This API will need to have endpoints for all of the items in the "Airflow Task SDK".

Additionally it should be written using technology such that we can support Websockets or some other "instant" push for low-latency task execution. In practice this means some Async web framework (FastAPI is a likely contender, but not required.)

We are purposefully not defining the "transport encoding" here (JSON vs gRPC vs msgpack etc) – that can be decided at implementation time as it does not have a material impact on the form of this proposal, and could possibly by handed be HTTP content negotiation (as one of the goals of this AIP is to allow tasks to be written in other languages than Python)

This API should be strongly versioned using CalVer, and we propose an approach such as https://docs.cadwyn.dev/ to allow us to easily support many versions in parallel (tl;dr of this project: calver + request "migrations" lets you easily handle multiple versions API clients without needing parallel support, even what might otherwise be a breaking change, as long as change from one version to the next does not throw away information.) Each change, no matter how small, to this API will result in a new CalVer.

This API will provide the means of communication to enable AIP-69 (Remote Workers) but does not currently address the exact means by which those workers pick up/get given new tasks.

API security and Strong per-task-try identity

One of the key concepts here that allows us to improve the security is to give each task try a strong and verifiable identity that the API server can use to make authorization decisions.

Our proposal here is a JWT token signed by the API server (specifically using one of the elliptic curve methods defined in RFC 7518 §3 as they produce smaller tokens, but that is an implementation detail) that is handed to the task through the executor interface when the task is enqueued. This does mean that access to read the "message bus" would allow you to steal one of these tokens and impersonate a task but I don't see any other option.

We should also consider how we could prevent replay attacks. For instance perhaps we include a "sequence" number somewhere in the request/response flow so that a new client using that existing token is detectable and blockable, and if we should only allow a token to be used from a single IP address (i.e. bind it to the first IP we see a request from)

Define an "Airflow Task SDK"

The Airflow Task SDK will include interfaces for:

  • Version introspection so that workers can tell what version/features the API server supports
  • Access to needed Airflow Connections, Variables, and XCom values
  • Report heartbeat
  • Record logs
  • Report metrics (such as "current progress through input etc" ← new feature! Consider this a wish list idea) and OpenLineage events
  • Report data about task expansion (values, lengths, names etc.) for downstream tasks
  • Read and write XCom
  • Read and write Triggers

This list is non-exhaustive and due to the proposed CalVer approach new endpoints can be added to be used by up-to-date workers

Access to the Connections and Variables will either read the "injected"/pre-declared variables, or ask the API server on demand depending on the mode. See the next two sections for more on that.

This will take the form of a new Python distribution and different classes that dags and providers are expected to import.

This task SDK will be a parallel/different set of python modules (initially, but very soon other languages too) that Tasks import that is different to airflow.models etc. This also means the DAG parser will need tweaking so the parsing code should use the same SDK that execution time does.

One thing to be careful in designing this is the handling of task logs. We really don't want to require every single log line for every single task execution go over the API server as that would be hard to make perform well, and it not really Airflow's core competency so we should leave that to other tools where we can. We do still need the ability to view logs of in-progress tasks though. In practice this might look something like the APi server sending a pre-signed S3 (etc.) URL of where to upload the logs to, and then some mechanism to get the in-progress logs. We are not proposing any one solution at this stage in the AIP, but we know the goals and will make suitable changes at implementation time to the Logging providers to support this as best as possible.

Similarly for reading or writing XCom, if we can avoid sending large data via the API server, and instead make requests directly to cloud blob storage we should. We will ensure that the common.io (and the possible proposed common.dataset) provider has the features we need so that large blobs don't go via API server.

Out of scope of this AIP for now is handling of custom callbacks – callbacks need to go where the dag files are though – through the existing Dag Processor mechanism. (Handling of Notifiers, i.e. Slack or Email could be optimized to run in the scheduler, but that is not required, nor part of this AIP. It fits more with AIP-69)

Extend Executor interface

Currently BaseExecutor defines three "entrypoints", queue_task_instance() (used by backfil only?) and queue_command() (used by scheduler only), and send_callback() . All of these will be removed and replaced with a new single interface: queue_activity(). The proposed (but not finalized) interface would be this:

def queue_activity(
    self,
    **, # Kwargs only to allow this to change and evolve easier over time without breaking changes
    kind: ActivityType, # For now just "ExecuteTask", "RunCallback", but allows for "ParseDAGFile" etc. in future
    # Contains TI for "ExecuteTask", so executor can get ti.queue out of this.
    context: dict[str, Any],
):
    ...

  The key difference here is that we no longer pass "commands to run" around between scheduler and worker, but better typed parameters. For example, in the case of CeleryExecutor this would have a payload of something like:

{
  "kind": "ExecuteTask",
  "token": "eyJhb....",
  "id": [ "dag_id", "task_id", "run_id", "attempt", "map_index"], # Or better, a single id, but we want these details to be passed along regardless
  "context": {    
        "connections": ["list", "of", "pre-injected", "connections"],
        "...",
    }
}

Connections may or may not be populated in this context, and there is many more fields in "context" that is not passed. We need to consider if this method is secure for all executors, and if there is a maximum (practical) length for each executor. For example, in KubeExecutor this context could be passed in to the pod via by creating a Kubernetes Secret object.

The exact format of this context dict is up to the implementation of the Executor and only serves as a informative example for this AIP only.

Dag Parsing Changes

Parsing DAG files involves running user code so this activity needs to have the same security properties as executing a task. To enable this the following changes will be needed:

  • Each DAG file parsing process will need a strong identity which identifies the source file being parsed. (This is needed as it is not unheard of to access Variables are file parse time.)
  • It will need to parse the file using the same Task SDK that execution time will use.
  • The internal API needs an endpoint to submit a serialized DAG representation. (Note: this API is not designed to be called by end users, but only by Airflow's own internal components at this time.)
  • Callbacks will still need to be run in the parsing process, and will need to be "passed down" to the parsing process somehow (Implementation time decision. Either when the process is launched, or over API response once the serialized DAG is submitted?)

Some of these items may be implemented as part of AIP-67.

Connection/Variable security models

A.k.a do we push/inject secrets into tasks or allow them to request them.

To be expanded on. Quick points:

  • Inject/pre-declare "task looks at connections with ID X, Y and Z" and only allow those is one mode, but it might be too big of a change for to almost every single dag out there
  • Allow a deployment level config setting to request other connections at runtime (default = false, can be set to true to ease migration?) Maybe this could all be handled inside OPA instead?
  • embed OPA (Open Policy Agent) via something like https://github.com/matrix-org/rust-opa-wasm (and create python bindings using PyO3.) allows users to define customizable checks for which DAGs/tasks/DAG files (at parsing time!) can read or write which variables and connections. And read and write access to XCom at runtime.
  • When combined with remote-workers (AIP-69) and an external secrets backend we likely want the ability to access secrets from a local-to-the-worker (or it's network) secret store. For example pulling a connection from a Vault inside the local network where the worker is running, such that the Airflow scheduler and API server etc do not have access to this. We will need to think more about how to enable this (as conversely you might want all secret access to be handled via the Airflow API server and workers to not have access to the secret store). We want to support both these modes by configuration of the Deployment Manager.

There are also security considerations to consider about how we would safely inject connections into a task – we probably don't want to send connection data along with the Task execution workload, so it might be API time enforcement of which connections a task can access either way.

Initially we are not proposing a UI or nice way of handling permissions policies – that can be added later (it could be added in time for 3.0, or it could follow up in a 3.x release)

This section is closely related to the Task SDK, but not required to be part of this AIP, so an option if this point is under debate is to move this to a separate AIP. This OPA based approach could entirely replace the need for AIP-67.

Which users are affected by the change?

Any custom operators that access the database directly will need re-writing.

There will likely be lots of "random"/unexpected access of the existing Airflow code inside operators and python operators.

Other considerations?

Having a different SDK that operators and triggers need to import is quite a sizeable change, but it is the only way to truly decouple provider code from "Airflow Core" (i.e. the scheduler and API servers). The official Airflow community providers will all be updated as necessary with the help of the community to use this new SDK.

Interaction with AIP-69 (Remote Worker)

This AIP provides a clean and secure foundation on which to build the remote worker feature.

Interaction with AIP-66 (DAG Fetcher/Versioned DAG execution.)

Everything in this AIP happens on the worker after the right version of the DAG is already in place on the worker in order to execute it. Some future thought will need to be given for how to support working with tasks in compiled languages.

What defines this AIP as "done"?

The new API exists, a Python Task SDK exists that can run "the same*" DAGs as on v2.x (*with minimal to no DAG file-level changes), and then a POC of tasks in one other language (Typescript or Go)


5 Comments

  1. we should remove the aka Task Context  cus it doesn't really make sense.  what this is about is a task execution interface and task context is part of that but just a part of that.  Also somewhat of a loaded term in airflow.

    1. I believe we need some Task Context as this is the idempotent input of the execute() method. Whereas if it is a breaking change all data should be modeled in a way that the context can be transferred as a simple serialized structure and not like the today's model objects which contain a DB session object.

  2. Thanks for the AIP! I really really like this after seeing how complex and Spaghetti the work of AIP-44 is. It will be a breaking change but a huge step to clean exsiting task execution and DB transactions up.

    Looking forward for some API proposals (high level at least) to get a feeling how an API might look like on the code level. Requirements and targets are already 90% OK for me with just some detailed comments to clarify... THANKS!

  3. No mention of triggerer in this doc.  Do we need to worry about that here?  Do we allow the "component" to have access even if tasks (or task-likes) don't?  E.g. I can imagine a world where celery or some other runner has access to db but the tasks don't.  That's how trigger is now.  Well at least they don't need access right now though they could truncate all the tables if they wanted (smile)