Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


StateIn Progress / Under Development.

Replaced by AIP-72 for AF 3, and we can ship what we have in main for 2.10 as "experimental" and stated to change for AF 3
Discussion Threadhttps://lists.apache.org/thread/nsmo339m618kjzsdkwq83z8omrt08zh3
Voting Threadhttps://lists.apache.org/thread/dybm8mkxpllghg820swb9pcdbs3hsto8
PRhttps://github.com/apache/airflow/pull/27892
Created

Created

AuthorsJarek Potiuk, Mateusz Henc



Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API.

Airflow Database APi is a new independent component of Airflow. It allows isolating some components (Worker, DagProcessor and Triggerer) from direct access to DB.

Note: Access control to Airflow Database API & Airflow Database rows is out of scope for this AIP, but it was kept in mind, making the implementation feasible.

Proposal

Internal API architecture

Airflow Internal API is logically a new API, independent of existing User-facing API embedded into Airflow Webserver:

  • They have different purposes: user-faced vs internal
  • They have different access model - end user credentials(roles) vs task-based access
  • They are different styles of APIS: REST vs. RPC-like (HTTP Based) 
  • The REST API is user facing and it is API-first (code reflects the API specification) where in case of Internal API the API should be code-first (the API reflects methods to be called)

The way how we approach implementation is still to be decided at the POC stage but we want to seamlessly integrate authentication between the two APIs. It should be possible to authenticate with both APIs using the same mechanism, and it should be possible to expose both APIs by the same web server .

The REST API and Internal API could be exposed together in a single process (webserver) but optionally they could be exposed separately in different processes. The Internal/REST API  separation can be introduced in two ways:

  • Internal APIs exposed together with REST API via Webserver with RBAC access control
    • We will have a separate “Internal API Role”, and “Internal API user” within the access model  that we use from FAB. You can only authenticate with the user, using the “Internal” authentication method: based on tokens 
    • The authorization of Workers should be based on validity of JWT tokens  generated  when Scheduler.  Dag Processor and Triggerer will have a secret that will be available only to them, not to the user code they run (see below the section on “Securing  access to authorization token in Dag Processor and Triggerer).The user code in the  Worker should not have access to any long term secret - they should only use limited-time valid JWT Token. 
    • This “Internal API Role” will have only permissions that will allow it to execute the Internal APIs (RPC-styled) that are needed to provide user-code with current functionality (Connections/Variables etc.) in a transparent way. Selected (possibly even all) other API methods from existing REST API that should be exposed to the Workers, Dag Processor and Triggerer. We will make an inventory and make a decision on  a “default” permissions for REST APIs during the implementation. This will be done using the same RBAC controls that we use in current REST API implementation.
  •   Physical separation of the components exposing both APIs:
    • The “Internal API” optionally can also be exposed by a component separate from the current webserver.
    • We also propose to implement a separate, optional “airflow internal-api” component, that could expose only the APIs needed by Workers, DAGProcessor and Triggerer. This component can be run in a separate “security zone”, allowing to separate access to the “Internal API” on a physical/networking level 

The Internal API calls are “RPC”-style APIs rather than CRUD APIS so they do not map into the REST  semantics. The Internal endpoints do not need to be published in the API specification so we plan to generate the APIs based on the code (decorated functions). As the next step (implementing POC) we will determine if we should generate the endpoints from the code via completely custom code, or whether we will be able to leverage existing libraries and tools (for example Fast API).. 

Later phase of “Umbrella” AIP-1 implementation (not implemented in this AIP) is to implement task-based authorization (each task should have access to a subset of operations depending on dag_id, task id, origin, etc.). While this AIP does not address it, it makes it possible. 

In the future, the JWT token can be used to carry additional (signed) payload which will be used to selectively authorize the caller to specific entities (connections, variables, specific tasks or dags only etc.)

No access to Metadata DB for “User code” in DagProcessor and Triggere

Currently, SQLAlchemy Models of Airflow are available also for the top-level user code from DAGs and callbacks. Essentially users can access Airflow Variables, Connections and other DB models while the DAG code is parsed or when callbacks are executed. This is however considered a bad practice for Top Level code, also using DB access for callbacks (TODO discuss it) is not recommended practice. It interferes with the parsing process and it can be replaced by other mechanisms. 

Similarly any DB acces in User Code in Triggerer is a very bad practice, because the database of Airflow is not available for async.io operations. Async.io SQLAlchemy queries are Beta functionality and only work with the asynchronous Postgres, experimental driver, so for all practical purposes any Trigger user-provided code should not access the metadata DB.

The proposal for the DB Isolation mode is that the Top-Level Python code and callbacks executed by the DAGProcessor are not allowed to access Airflow Database at all. 

This could be achieved by not exposing an SQLAlchemy session to the process that executes DAG Parsing and callbacks. While this does not provide complete isolation for malicious users (DAG Parsing/Callbacks will be executed as sub-processes of DAGProcessor and full sandboxing of the parsing process is out of the scope for now), this could be a handy mechanism to improve performance and enforce best practices for DAG Top-Level Process parsing and callbacks.

This also means that all the Community operators should not  use any DB operations in their constructors. This is similarly considered as bad practice, and enforcing that should be great side effect of introducing the DB Isolation mode. As part of the implementation, all such operators that are currently using DB operations will be identified and fixed.

Securing access to authorization token in DagProcessor and Triggerer

Another (security related) consequence of “no-access” for the user code by DagProcessor and Triggerer is that the user code run by DagProcessor and Triggerer should have no access to the authentication information to Internal API. Both components will have to have a long-living token that will allow them to authenticate and later stage authorize to run selected internal API methods, and this token should not be made available to user code. By default - in both cases - executing of the user code happens in a separate process and the token saved in memory might be removed while forking or not passed when the processes are spawned, however the token (or at least token that allows the DagProcessor and Triggerer to retrieve the actual token to use)  must be stored locally on the server where DagProcessor and Triggerer are run - to handle cases like restarts without manual intervention. That puts special requirements on the parsing and event loop processes started by DagProcessor and Triggerer respectively. The following options should be implemented (TODO: discuss this):

  • The configuration file where the token (or configuration of long-living token  to retrieve it) should not be “world readable” and it should be readable only by the user running DagProcessor/Trigger. In the most “strict” mode, we can perform the check at the startup and refuse Airflow to start if the configuration file has wrong permissions
  • Parsing/Event loop processes might be started as “no-user” processes . The consequence of this is that DagProcessor and Triggerer should be run in impersonation mode, where the user running Airflow should have “sudo” capability. This provides pretty good isolation, however requires careful configuration of the deployment in order to fully secure the authentication/authorization information.
  • The Parsing/Even loop processes might be run as containers (there are separate discussions about it related to AIP-46 Runtime isolation for airflow tasks and dag parsing where the configuration of Airflow will not be mapped into the container. That provides greater isolation and security level and should only be considered in certain deployments where Docker engine is available

Test harness

As part of the implementation, we want to implement a test harness tapping into the existing CI/test environment of Apache Airflow, where the above assumptions can be automatically verified for all the community provided code:


We will run all CI Test Suite in “DB Isolation Mode” - where the unit tests will get a modified SQLAlchemy Session that will catch incorrect uses of the Database:

  • Using disallowed DB operations in DagProcessor
  • Using any DB access in any of the example DAG’s top-level code
  • Using any DB Access in __init__() methods of any of the Community Operators
  • Using disallowed models in the Community Hooks
  • Using disallowed DB operations in "execute()" calls of the operators
  • Testing scenarios in which transaction succeeds but the connection problems cause the "success" is not propagated to client

Internal API inventory

The Internal API serves three components: Dag Processor, Workers, Triggerer. This table shows an inventory of impacted methods, their applicability and our approach for the methods.

Note that Triggerer, asynchronous, user provided code, almost by definition should not perform any DB calls synchronously (and this is the only possibility to access Airflow Code) . This is considered as bad practice and we believe not allowing Airflow DB access (and not providing the equivalent API)  from the user provided code by the triggerer might improve the performance and characteristics of Triggerer. Any attempt to use an SQLAlchemy/ direct DB access by the user-provided code should raise an exception - to be verified during POC.

User code method inventory

Those methods are available to User code that runs in Workers as part of task execution. We can easily utilize existing REST API to retrieve those. Additionally users will have access to pre-configured Python Client (installed as Airflow Dependency) that will enable Operator developers and DAG writers to access all the exposed REST APIs of airflow in a straightforward way. Note that users will also have to access config which is not a DB access, however access to Config should also be done via REST API - this allows to secure access to Secret Backends - no user code will have access to the secret which is used to authenticate with Secret Backends.

Case

Proposed approach 

Connection

Only READ-ONLY BaseHook.get_connection() will be implemented to transparently read Connection from Database or from Secrets. Secret access will also be done using the Internal API Server. Currently the connection performs a database roundtrip from all components. Replacing it with REST  API call will make it slightly slower but it will also leverage SQLAlchemy Connection Pool by the Internal API Server. Connection retrieval is infrequent operation in general

Variable

READ-WRITE Variable.get/set/update/clear  methods will be implemented transparently using APIs. Similar performance consideration as in case of Connection.

XCom

READ-WRITE all of the operation (get*/clear*) will be implemented to transparently use Internal API, Custom XCom Backend access should be done directly from the “calling entity” however and we should provide a mechanism of passing temporary credentials from the XCom Backends so that the workers can store/retrieve XCom data directly without going through the API server. This can be implemented for example using pre-signed URLS or other mechanisms where temporary credentials to access the data are passed from the server to the worker.

Other REST APIs

The tasks running in workers will also have access to the Airflow Python Client that will use the “Task” JWT token for authentication and be configured to talk to the Airflow Webserver automatically. This will provide easy access to the functionality exposed by Airflow via API to Operator developers and DAG writers, who will have standardized access to Airflow via the client.

Non-DB code that should also be isolated (because of Secret Backends)

Config

READ-ONLY access to retrieve configuration Secret Backend access will only be done on the Internal API server. Option: Until selective authorization is implemented we might allow direct access from Workers/DagProcessor for scalability reasons even if full security might not be achievable. In this case secret credentials might leak to user code.  Also, in most “strict” deployment (see above discussion about the access of user code to authentication token), the configuration file should not be made available to the user code directly. 

Internal API calls

Those methods are not necessarily going to be mapped 1-1 into the actual API calls. It is possible that several methods might be joined together for efficiency (with some possible duplications resulting if methods are used in several places). It is important however that each of the methods constitutes a complete transaction (or several transactions), because going through APIs will make it extremely difficult to maintain transactional integrity across multiple methods.

Dag Processor Airflow Code

Function/Method

Proposed approach 

manage_slas

This method should be exposed as a separate Internal API call (in DB isolation mode). It is essentially a single, independent  transaction which also covers sending notifications (note - in case we implement custom notification engines, the code of those will be executed in the context of the Internal API server. .

update_import_errors

This is also a single-session method that can be exposed and executed fully by the Internal API server.

execute_callbacks

This method should stay within the DAGProcessor  but it should use Internal API to retrieve appropriate models (which will be API objects rather than SQLAlchemy objects): DagRun, TaskInstances, Template_context - via current Public API (extensions needed). We might need to optimize/update some of the APIs to get missing information and achieve best performance by batching retrieval of only those objects that are needed.

deactivate_missing_dags

This should be a separate, new Internal API method call that should perform a single transaction. Note that currently commit is not performed for that method, but it should  have no adverse effect to make this separate transaction.

update_import_errors

This should be a separate, new internal API method call that should perform a single transaction. Similarly as above - currently this is done in the same transaction but, there should be no adverse effects of making this separate transaction

save_dag_to_db

This is a new, “heaviest” method that should be exposed by the internal API to the DAG processor. We should be able to serialize all the information stored currently in the DagBag and send it to the Internal API server. The method should be roughly equivalent to: sync_to_db and pickling dags if set. This should be done in a single transaction as a single API call.

Triggerer Airflow Code

Case

Proposed approach 

Trigger Model

Similarly as in case of the User-Code, the Trigger class should be updated to transparently handle calls to it: bulk_fetch, clean_used, assign_unassigned, submit_event, submit_failure, ids_for_triggerer. All those methods are currently single transaction calls and can be easily mapped into API calls with serialized information. Performance wise it exchanges HTTP REST calls with a DB call which yields some unavoidable performance penalty. However only submit_failure and sumbit_event will have visible impact due to their “non-batch” nature. In case this constitutes a problem we might consider a mechanism where we also accumulate and batch those events into batch calls.


Worker Airflow Code

Case

Proposed approach 

refresh_taskinstance_from_db

Single transaction retrieving data from the DB - part of internal API..

update_task_instance

There are a few places where tasks are saved to the db in a separate transaction after task values are updated. (host/job_id). We will implement a new API endpoint for it.

defer_task

We should implement an internal API call that would create a trigger entry and update the task state. This method is a single-transaction method so it should have limited performance impact - Internal API can utilise connection pooling much better than Workers.

get_template_context

We should implement a separate Internal API call that retrieves context for the task. All fields in context should be serializable (or we can make them so) and performance of this method should not be heavily impacted as we exchange a DB call with an API call from the point of view of Worker and Internal API implementation can utilize connection pooling much better.

handle_reschedule

We should implement a separate, Internal API call that implements adding task reschedule via DB. This is a single transaction so it should be easy to replace it with a single API call.

handle_failure

Similarly as above, we should implement a single API call as this method is always followed by a commit.

add_log

The message should add a log model entry as a separate call. Similar considerations as above.

update_state

Updates task instance state. Also separate methods od in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode. - similar considerations. One watchout as there is an option to execute callbacks while running this function. Rather than executing them in this case we should return a list of callbacks to execute and execute them in the context of the worker. They will be executed after the transaction is complete and there is a small risk that callbacks would be lost in this case, however the risk is rather minimale. Option: We could consider options to store the callbacks in the database instead and delegate execution of those to DagProcessor. 

run_mini_scheduler_on_child_tasks

This should be a separate method in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode.

Implementation notes: Internal API component

Airflow Internal API provides methods only for "untrusted" components (Worker/Dag Processor/Triggerer), as Scheduler will use only direct DB access.

While the APIs could be exposed also via webserver, we propose to add a new, optional, stateless (thus horizontally scalable) component that can be started using a new command line command: airflow internal-api.

Code is located in airflow.api_internal package, with all endpoint's methods available via in /api_internal/ endpoint. 

As mentioned above the “Internal API” is not an “API-first” but “Code first”. The implementation details on how we map particular methods into endpoints is going to be worked out in detail at the POC that follows the AIP, but we attempt to approach the implementation in the way that will minimize impact on the existing code (at most slight refactoring and splitting existing methods into several or joining them). All the methods exposed via API will have to be adapted to make their parameters and return values serializable (so in some cases we will have to perform conversion between the objects that are big/not serializable into appropriate representation needed by the method (for example DagBag passed in a few methods above and example below should be turned in list of ids and parameters needed by the methods). All the methods involved will have to be converted into “functions” so that no instance objects are accessed.

Another important factor is future maintainability - in the implementation we will avoid code duplication by implementing decorators that will either use the locally implemented DB access or delegated automatically to a corresponding API call. The current “@provide_session” decorator will provide an “Assert” session in Workers or DagProcesor in the DB Isolation mode, so any attempt of accessing the DB from those will raise an exception.

In order to achieve maintainability, we want to make sure that all the communication - both in case of DB isolation mode and in case of "DB" mode for those methods is executed using  RPC (Remote Procedure Calls). Even in case of "no isolation mode", the methods from the inventory above will always be executed via "internal RPC server". Modern RPC solutions provide a way to run a local server for RPC connections, which provide a much more performant version of the RPC methods - without authentication, security, communication overhead., while using the same code and mechanims to execute those.  

The short research of the RPC implementations available  suggest the following popular and proven implementations we could use:

  • XMLRPC: part of Python standard library 
  • gRPC: Google-originated Apache 2.0 licensed implemetnation (based on binary Protocol Buffers)
  • Apache Thrift : Facebook-originated (now Apache) RPC using binary Thrift protocol

Out of the protocols. XMLRPC is quite simple and has quite a big (XML-induced) overhead for serializing/deserializing/communication, where both gRPC and Thrift use efficient, binary, extendable protocols that introduce very low overhead on serializing/deserializing and communication. Whille we are not making the final decision yet, the firs step of the project will be to analyze, implement POC and perform necessary benchmarking to decide which of the two implementation (GRPC or Apache Thrift) to use. We are also reaching out to Apache Beam team who has experience with both protocols and can help us with the analysis they've done on the capabilities of those protocols in the past.

Both implementations are similar in the sense that we need to define description of the RPC methods we are going to have (either Protobuf or Thrift) and compiler will generate the Python client and server code. Both implementation have full support for HTTPS communication including various authentication options (extendable) and both can communicate via TCP/UnixSockets for local implementation. Apache Thrift has also "shared-memory" mode where the communication between "client" and "server" can be done over shared memory. 

During initial POC we performed a query among some experiences of other Apache Projects (ApacheBeam) and we chose gRPC . However we reserve the possibilty of swapping out the gRPC with pure JSON/Open API based approach. The POC will continue, while we are trying out and testing the concerns raised during the discussion - multi-threading vs. multiprocessing approach, loadbalancing and capabilities of monitoring such requests have been raised - see https://lists.apache.org/thread/1pw2ohdyjtw1nj7ckyofq70sv8onxqdd. The "Internal API" abstraction will allow to swap them out and we might even decide to keep both options if we find they fare better in different scenarios.

Below example show how we could like define "process_file" method with GRPC protobuf definition (this is a small part of the final definition needed and not neccessary compiles but should give an idea). You can also see the POC PR implementing gRPC method calls used for performance testing

The example is updated with "clearer" approach proposed by Ash Berlin-Taylor  to add an extra level of "Internal Client" Abstraction and base the decision on global settings rather than object's method (see https://lists.apache.org/thread/4wmftmjfqhwc4h6r87tn636lkttj1hzo)

Section
bordertrue


Column
width30%


Code Block
languagepy
themeEclipse
titleOriginal method
    @provide_session
    def process_file(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
        session: Session = None,
) -> Tuple[int, int]:
        self.log.info("Processing file %s for tasks to queue", file_path)

        try:
            dagbag = DagBag(file_path, include_examples=False, \
                            include_smart_sensor=False)
        except Exception:
            self.log.exception("Failed at reloading the DAG file %s", \
                              file_path)
            Stats.incr('dag_file_refresh_error', 1, 1)
            return 0, 0

        self._deactivate_missing_dags(session, dagbag, file_path)
        ...

    @provide_session
    def _deactivate_missing_dags(self, session, dagbag, file_path):
        ...
        self.log(....)
        ...



Column
width30%


Code Block
languagepy
themeEclipse
titleGRpc Proto definition
syntax = "proto3";

package airflow.internal_api;

message TaskInstanceKey {
  string dag_id = 1;
  string task_id = 2;
  string run_id = 3;
  int32 try_number = 4;
  int32 map_index = 5;
}

message SimpleTaskInstance {
  string dag_id = 1;
  string task_id = 2;
  string run_id = 3;
  optional float start_date = 4;
  optional float end_date = 5;
  int32 try_number = 6;
  int32 map_index = 7;
  string state = 8;
  bytes executor_config = 9;
  string pool = 10;
  string queue = 11;
  TaskInstanceKey key = 12;
  optional string run_as_user = 13;
  optional int32 priority_weight = 14;
}

message TaskCallbackRequest {
  string full_filepath = 1;
  SimpleTaskInstance task_instance = 2;
  optional bool is_failure_callback = 3;
  optional string message = 4;
}

message DagCallbackRequest {
  string full_filepath = 1;
  string dag_id = 2;
  string run_id = 3;
  optional bool is_failure_callback = 4;
  optional string message = 5;
}

message SlaCallbackRequest {
  string full_filepath = 1;
  string dag_id = 2;
  optional string message = 3;
}

message Callback {
  oneof callback_type {
      TaskCallbackRequest task_request = 1;
      DagCallbackRequest dag_request = 2;
      SlaCallbackRequest sla_request = 3;
   }
}

message FileProcessorRequest{
  string path = 1;
  repeated Callback callbacks = 2;
  bool pickle_dags = 3;
}

message FileProcessorResponse {
  int64 dagsFound = 1;
  int64 errorsFound = 2;
}

service FileProcessorService {
  rpc processFile (FileProcessorRequest) returns (FileProcessorResponse) {}
} 




Column
width30%


Code Block
languagepy
themeEclipse
titleNew method with optional GRPC path
    def process_file_grpc(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
    ) -> Tuple[int, int]:
        request = internal_api_pb2.FileProcessorRequest(path=file_path, pickle_dags=pickle_dags)
        for callback_request in callback_requests:
            request.callbacks.append(callback_request.to_protobuf())
        res = self.stub.processFile(request)
        return res.dagsFound, res.errorsFound

    def process_file(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
    ) -> Tuple[int, int]:
		if settings.DATABASE_ACCESS_ISOLATION:
            return InternalAPIClient.dagbag_process_file(
                file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
            )
        return self.process_file_db(
            file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
        )





Performance impact


The performance impact is visible (as expected). It's quite acceptable for a distributed environment (in exchange for security), but it's likely significant enough to stay with the original idea of having Airflow work in two modes: a) secure - with RPC apis, b) standard - with direct DB calls.

On "localhost" with a local DB the performance overhead for serializing and transporting the messages between two different processes introduced up to 10% overhead. I saw an increase of time to run 500 scans of all our example folders dags going from ~290 to ~320s pretty consistently. Enough of a difference to exclude volatility. I tested it in Docker on both ARM and AMD. It seems that in some cases that can be partially  offset by slightly increased parallelism on multi-processing machines run on "bare" metal. I got consistently just 2-3% increase on my linux without docker with the same test harness, but we cannot rely on such configurations, I think focusing on Docker-based installation without any special assumptions on how your networking/localhost is implemented is something we should treat as a baseline.

The same experiment repeated with 50 messages but of much bigger size (300 Callbacks passed per message) have shown 30% performance drop. This is significant, but I believe most of our messages will be much smaller. Also the messages I chose were very special - because in case of Callback we are doing rather sophisticated serialization, because we have to account for Kubernetes objects that are potentially not serializable, so the test involved 300 messages that had to be not only "on the wire" serialized but also some parts of them had to be Airflow-JSON serialized and the whole "executor config" had to be picked/unpickled (for 100 such messages out of 300). Also we have some conditional processing there (there is a union of three types of callbacks that has to be properly deserialized). And those messages could be repeated. This is not happening for most other cases. And we can likely optimize it away in the future. but I wanted to see the "worst" scenario.

For the remote client (still local DB for the internal API server) I got between 30% and 80% slow-down, but this was on my WiFi network. I am sure with a "wired" network, it will be much better, also when a remote DB gets into picture the overall percentage overhead will be much smaller. We knew this would be a "slower" solution but this is the price for someone who wants to have security and isolation.

No tests were performed with  SSL, but this will be a modest increase. The "impact" of 10% is IMHO enough to make a decision that we cannot get "Internal but remote-only way" - the path where we will continue using Direct DB access should stay and implementation of POC shows how to do it.

Implementation and maintainability


In the PR you cabn see  implementation details and how it will impact our code in general. The proposal is rather elegant and easy to maintain, and likely we can improve it with decorators later, the proposal focuses more on explicitness and showing the mechanisms involved.

Seems that it is easy to implement and it does seem to have some good "easy maintainability" properties.

It boils down to few rules:

  • For all the data and "Messages" we sent, we have a nice "Internal API" defined in .proto and protobuf objects generated out of that.

The "API specification" nicely defines the structures we are going to send over the network. It's very standard, it has a really good modern support. For one, we automatically (with pre-commit) generate MyPY type stubs for the generated classes and it makes it super easy to both - implement the mapping and automatically verify its correctness. Mypy nicely catches all kinds of mistakes you can make! Autocomplete for the PROTO-generated classes works like a charm. During the POC implementation nice detection of mistakes provided by MyPy guided implementation to be possible (avoiding a lot of errors) way faster than without it.

  • All Airflow objects that we get to send over the wire should get from_protobuf/to_protobuf (or fromJSON/toJSON( methods (this could likely be separated from the object, especially if we decide on supporting more than one transport protocol).

Those are usually very simple (just passing strings/ints/boolean fields to the constructor), and the structures we pass are rather small (see the PR). Also few more complex cases (with more complex serialization) were implemented as part of the POC and it is easy, readable and pretty well integrated into our code. It introduces a little duplication here and there, but those objects change rarely (only when we implement big features like Dynamic Task mapping) and MyPy should guards us against any mishaps there (now that our code is mostly typed).

  • We make all "DB Aware" objects also "Serializaation aware". The number of changes in the actual code/logic is rather small and should be easy to maintain.

There are basically two changes to implement for any method in the above inventory:

    1) For any of the objects that are used for database operations (in the POC PR this is DagFileProcessor) we need to use a global configuration flag to decide how to run it and in case remote communication is used, we pass the serialized objects via the channel that will be used for communication. 
    2) The initial method has to be renamed and original method has to be replaced with small snipped to make a direct-DB or Remote call (see above)

Deployment options

Option 1:No isolation (today):

[core]database_access_isolation=False (default)

All components have direct access to the Airflow database as they do right now. No change/impact on Airflow execution.


draw.io Diagram
bordertrue
diagramNameCopy of Diagram bez tytułu
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1021
revision2

Option 2: Internal API as part of Web server

This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as passing additional parameter --with-internal-api to the airflow webserver command.

In this mode Internal API is executed as part of Airflow API embedded into the web server, which allows saving resources in small installations.


draw.io Diagram
bordertrue
diagramNameDiagram bez tytułu
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1131
revision5

Option 3: Internal API as external component

This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as starting additional component with `airflow internal-api`

In this mode Internal API is executed as a standalone component, using the same code as Web Server API but exposing only parts of the endpoints (Internal-API related). This mode allows scaling independently (e.g. on Kubernetes using HPA)


draw.io Diagram
bordertrue
diagramNameCopy 2 of Diagram bez tytułu
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1151
revision2

Expand
titleAlternative approach: DbSession class extending sqlalchemy.Session

Alternative approach: DbSession class extending sqlalchemy.Session

To make the transition transparent for users, I proposed to implement changes at the lowest possible level - SQLAlchemy Session

Sessions are created in settings and which are then used by create_session method. It is then used to make queries to the Database with SQL Alchemy, e.g.:

Code Block
languagepy
session.query(DagRun)
  .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
  .first()


To make it support DbIsolation mode, without any change in the user code -  the Session object must be extended - and it is by providing a subclass DBSession
It supports both "standard" and "DBIsolation" modes, overriding the Session class methods, example for query: (just a code snippet)

Code Block
languagepy
class DBSession(Session):

  def __init__(
      self,
      # Session parameters
      bind=None,
      autoflush=True,
      autocommit=False,
      expire_on_commit=True,
      info=None,
     # New parameter
      db_isolation_mode=False):

    super().__init__(
      bind=bind,
      autoflush=autoflush,
      expire_on_commit=expire_on_commit,
      autocommit=autocommit,
      info=info)

    self.db_isolation_mode = db_isolation_mode
    self.db_api_client = None
    if(db_isolation_mode):
      # Client for Airflow Database API
      self.db_api_client = DbApiClient() 

  def query(self, *entities, **kwargs):
    if not self.db_isolation_mode:
      return super().query(*entities, **kwargs)
    return DbApiClientQuery(entities=entities, db_api_client=self.db_api_client)


All other public methods must also be overridden in DBSession and either call proper super class methods or provide "db_isolation_mode" implementation - even if it's just raising an exception.

If db_isolation_mode is False then the DbSession works exactly the same way as the Session.

In db_isolation_mode = True it returns new object type: DbApiClientQuery that sends requests to Airflow Database API instead of SQL database:

Code Block
languagepy
class DbApiClientQuery:

  def __init__(self, entities, db_api_client):
    self._entities=entities
    self._db_api_client = db_api_client
    self._filters=[]

  def filter(self, *criterion):
    for c in criterion:
      columnId= c.left.description
      value= c.right.effective_value
      self._filters.append((columnId,value))
    return self

  def all(self):
    return self._call_api_query_method(DbQueryMode.ALL)

  def first(self):
    return self._call_api_query_method(DbQueryMode.FIRST)

  def one_or_none(self):
    return self._call_api_query_method(DbQueryMode.ONE_OR_NONE)

  def _call_api_query_method(self, query_mode):
    kwargs={}
    for (columnId,value) in self._filters:
      kwargs[columnId]=value
    method = self._get_query_method_name()
    return getattr(self.db_api_client, method)(
      query_mode = query_mode,
      **kwargs)

  def _get_query_method_name(self):
    if self._entities[0]==DagRun.__name__:
      return 'get_dag_runs'
    raise NotImplementedError()

class DbQueryMode(Enum):
    ALL = 'all'
    FIRST = 'first'
    ONE_OR_NON ='one_or_none'


Note: It assumes the filter contains only simple conditions with AND operator.

DbApiClient must have get_dag_runs which calls the proper Airflow Database API method with provided optional parameters (as query strings).

DB Data mutations 

Mutations are implemented similarly to query, by overriding "delete" or "update" and mapping the request to the proper Airflow Database API method. 

For "add" it sends the serialized object to Airflow Database API where it is deserialized and added to the DB.

Complex statements

Not all statements can be covered with the approach described above, e.g. In processor.py there is:

Code Block
languagepy
qry = (
  session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
  .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
  .filter(TI.dag_id == dag.dag_id)
  .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
  .filter(TI.task_id.in_(dag.task_ids))
  .group_by(TI.task_id)
  .subquery('sq')
)

max_tis: List[TI] = (
  session.query(TI)
  .filter(
      TI.dag_id == dag.dag_id,
      TI.task_id == qry.c.task_id,
      TI.execution_date == qry.c.max_ti,
  )
  .all()
)

Instead of covering all cases in the DbSession filter method, the DbSession class has a dedicated method for this case.

The code in processor.py simply calls this method, and its implementation covers both cases: DBDirect(using SQLALchemy and direct Database call) and DBIsolation mode - calling a dedicated Airflow Database APi endpoint (which calls the same DbSession method, but in DBDirect mode)

e.g.


Code Block
languagepy
class DBSession(Session):
  ...
  def get_max_tis(self, dag_id,task_ids):
    if self.db_isolation_mode:
      self.db_api_client.get_max_tis(dag_id, task_ids)
    else:
      qry = (
        session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
        .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
        .filter(TI.dag_id == dag.dag_id)
        .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
        .filter(TI.task_id.in_(dag.task_ids))
        .group_by(TI.task_id)
        .subquery('sq')
    )
    return (
        session.query(TI)
        .filter(
            TI.dag_id == dag.dag_id,
            TI.task_id == qry.c.task_id,
            TI.execution_date == qry.c.max_ti,
        )
        .all()
    )




Configuration

Together with Airflow Database API the following configuration setting are introduced:

  • [core] database_access_isolation

Whether the DBIsolation mode is enabled and untrusted components should use Airflow Database API for accessing Airflow Database

  • [core] database_api_host

Url for Airflow Database API. Used only if [core]database_access_isolation is True

Dag processor, worker and triggerer

All tests for DagProcessor, Workers (including Operators) and triggers are executed in both modes "DBDirect'' and "DBIsolation ''. The code for Airflow Database API, the DbApiClient and DBSession are extended until all tests pass.

Why is it needed?

This change allows Workers, DagProcessors and Trigerrer to work in untrusted mode - without direct access to Database.

Which users are affected by the change? 

Only those that set [core]database_access_isolation=True

How are users affected by the change? (e.g. DB upgrade required?)

What defines this AIP as "done"?

It is possible to start an Airflow Database APi from Airflow CLI and it is easily extensible with new methods. Workers, Dag Processors  and Trigerrer  can work in DbIsolation mode - without direct access to DB.