Status

StateIn Progress / Under Development.

Replaced by AIP-72 for AF 3, and we can ship what we have in main for 2.10 as "experimental" and stated to change for AF 3
Discussion Threadhttps://lists.apache.org/thread/nsmo339m618kjzsdkwq83z8omrt08zh3
Voting Threadhttps://lists.apache.org/thread/dybm8mkxpllghg820swb9pcdbs3hsto8
PRhttps://github.com/apache/airflow/pull/27892
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

AuthorsJarek Potiuk, Mateusz Henc

Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API.

Airflow Database APi is a new independent component of Airflow. It allows isolating some components (Worker, DagProcessor and Triggerer) from direct access to DB.

Note: Access control to Airflow Database API & Airflow Database rows is out of scope for this AIP, but it was kept in mind, making the implementation feasible.

Proposal

Internal API architecture

Airflow Internal API is logically a new API, independent of existing User-facing API embedded into Airflow Webserver:

  • They have different purposes: user-faced vs internal
  • They have different access model - end user credentials(roles) vs task-based access
  • They are different styles of APIS: REST vs. RPC-like (HTTP Based) 
  • The REST API is user facing and it is API-first (code reflects the API specification) where in case of Internal API the API should be code-first (the API reflects methods to be called)

The way how we approach implementation is still to be decided at the POC stage but we want to seamlessly integrate authentication between the two APIs. It should be possible to authenticate with both APIs using the same mechanism, and it should be possible to expose both APIs by the same web server .

The REST API and Internal API could be exposed together in a single process (webserver) but optionally they could be exposed separately in different processes. The Internal/REST API  separation can be introduced in two ways:

  • Internal APIs exposed together with REST API via Webserver with RBAC access control
    • We will have a separate “Internal API Role”, and “Internal API user” within the access model  that we use from FAB. You can only authenticate with the user, using the “Internal” authentication method: based on tokens 
    • The authorization of Workers should be based on validity of JWT tokens  generated  when Scheduler.  Dag Processor and Triggerer will have a secret that will be available only to them, not to the user code they run (see below the section on “Securing  access to authorization token in Dag Processor and Triggerer).The user code in the  Worker should not have access to any long term secret - they should only use limited-time valid JWT Token. 
    • This “Internal API Role” will have only permissions that will allow it to execute the Internal APIs (RPC-styled) that are needed to provide user-code with current functionality (Connections/Variables etc.) in a transparent way. Selected (possibly even all) other API methods from existing REST API that should be exposed to the Workers, Dag Processor and Triggerer. We will make an inventory and make a decision on  a “default” permissions for REST APIs during the implementation. This will be done using the same RBAC controls that we use in current REST API implementation.
  •   Physical separation of the components exposing both APIs:
    • The “Internal API” optionally can also be exposed by a component separate from the current webserver.
    • We also propose to implement a separate, optional “airflow internal-api” component, that could expose only the APIs needed by Workers, DAGProcessor and Triggerer. This component can be run in a separate “security zone”, allowing to separate access to the “Internal API” on a physical/networking level 

The Internal API calls are “RPC”-style APIs rather than CRUD APIS so they do not map into the REST  semantics. The Internal endpoints do not need to be published in the API specification so we plan to generate the APIs based on the code (decorated functions). As the next step (implementing POC) we will determine if we should generate the endpoints from the code via completely custom code, or whether we will be able to leverage existing libraries and tools (for example Fast API).. 

Later phase of “Umbrella” AIP-1 implementation (not implemented in this AIP) is to implement task-based authorization (each task should have access to a subset of operations depending on dag_id, task id, origin, etc.). While this AIP does not address it, it makes it possible. 

In the future, the JWT token can be used to carry additional (signed) payload which will be used to selectively authorize the caller to specific entities (connections, variables, specific tasks or dags only etc.)

No access to Metadata DB for “User code” in DagProcessor and Triggere

Currently, SQLAlchemy Models of Airflow are available also for the top-level user code from DAGs and callbacks. Essentially users can access Airflow Variables, Connections and other DB models while the DAG code is parsed or when callbacks are executed. This is however considered a bad practice for Top Level code, also using DB access for callbacks (TODO discuss it) is not recommended practice. It interferes with the parsing process and it can be replaced by other mechanisms. 

Similarly any DB acces in User Code in Triggerer is a very bad practice, because the database of Airflow is not available for async.io operations. Async.io SQLAlchemy queries are Beta functionality and only work with the asynchronous Postgres, experimental driver, so for all practical purposes any Trigger user-provided code should not access the metadata DB.

The proposal for the DB Isolation mode is that the Top-Level Python code and callbacks executed by the DAGProcessor are not allowed to access Airflow Database at all. 

This could be achieved by not exposing an SQLAlchemy session to the process that executes DAG Parsing and callbacks. While this does not provide complete isolation for malicious users (DAG Parsing/Callbacks will be executed as sub-processes of DAGProcessor and full sandboxing of the parsing process is out of the scope for now), this could be a handy mechanism to improve performance and enforce best practices for DAG Top-Level Process parsing and callbacks.

This also means that all the Community operators should not  use any DB operations in their constructors. This is similarly considered as bad practice, and enforcing that should be great side effect of introducing the DB Isolation mode. As part of the implementation, all such operators that are currently using DB operations will be identified and fixed.

Securing access to authorization token in DagProcessor and Triggerer

Another (security related) consequence of “no-access” for the user code by DagProcessor and Triggerer is that the user code run by DagProcessor and Triggerer should have no access to the authentication information to Internal API. Both components will have to have a long-living token that will allow them to authenticate and later stage authorize to run selected internal API methods, and this token should not be made available to user code. By default - in both cases - executing of the user code happens in a separate process and the token saved in memory might be removed while forking or not passed when the processes are spawned, however the token (or at least token that allows the DagProcessor and Triggerer to retrieve the actual token to use)  must be stored locally on the server where DagProcessor and Triggerer are run - to handle cases like restarts without manual intervention. That puts special requirements on the parsing and event loop processes started by DagProcessor and Triggerer respectively. The following options should be implemented (TODO: discuss this):

  • The configuration file where the token (or configuration of long-living token  to retrieve it) should not be “world readable” and it should be readable only by the user running DagProcessor/Trigger. In the most “strict” mode, we can perform the check at the startup and refuse Airflow to start if the configuration file has wrong permissions
  • Parsing/Event loop processes might be started as “no-user” processes . The consequence of this is that DagProcessor and Triggerer should be run in impersonation mode, where the user running Airflow should have “sudo” capability. This provides pretty good isolation, however requires careful configuration of the deployment in order to fully secure the authentication/authorization information.
  • The Parsing/Even loop processes might be run as containers (there are separate discussions about it related to AIP-46 Runtime isolation for airflow tasks and dag parsing where the configuration of Airflow will not be mapped into the container. That provides greater isolation and security level and should only be considered in certain deployments where Docker engine is available

Test harness

As part of the implementation, we want to implement a test harness tapping into the existing CI/test environment of Apache Airflow, where the above assumptions can be automatically verified for all the community provided code:


We will run all CI Test Suite in “DB Isolation Mode” - where the unit tests will get a modified SQLAlchemy Session that will catch incorrect uses of the Database:

  • Using disallowed DB operations in DagProcessor
  • Using any DB access in any of the example DAG’s top-level code
  • Using any DB Access in __init__() methods of any of the Community Operators
  • Using disallowed models in the Community Hooks
  • Using disallowed DB operations in "execute()" calls of the operators
  • Testing scenarios in which transaction succeeds but the connection problems cause the "success" is not propagated to client

Internal API inventory

The Internal API serves three components: Dag Processor, Workers, Triggerer. This table shows an inventory of impacted methods, their applicability and our approach for the methods.

Note that Triggerer, asynchronous, user provided code, almost by definition should not perform any DB calls synchronously (and this is the only possibility to access Airflow Code) . This is considered as bad practice and we believe not allowing Airflow DB access (and not providing the equivalent API)  from the user provided code by the triggerer might improve the performance and characteristics of Triggerer. Any attempt to use an SQLAlchemy/ direct DB access by the user-provided code should raise an exception - to be verified during POC.

User code method inventory

Those methods are available to User code that runs in Workers as part of task execution. We can easily utilize existing REST API to retrieve those. Additionally users will have access to pre-configured Python Client (installed as Airflow Dependency) that will enable Operator developers and DAG writers to access all the exposed REST APIs of airflow in a straightforward way. Note that users will also have to access config which is not a DB access, however access to Config should also be done via REST API - this allows to secure access to Secret Backends - no user code will have access to the secret which is used to authenticate with Secret Backends.

Case

Proposed approach 

Connection

Only READ-ONLY BaseHook.get_connection() will be implemented to transparently read Connection from Database or from Secrets. Secret access will also be done using the Internal API Server. Currently the connection performs a database roundtrip from all components. Replacing it with REST  API call will make it slightly slower but it will also leverage SQLAlchemy Connection Pool by the Internal API Server. Connection retrieval is infrequent operation in general

Variable

READ-WRITE Variable.get/set/update/clear  methods will be implemented transparently using APIs. Similar performance consideration as in case of Connection.

XCom

READ-WRITE all of the operation (get*/clear*) will be implemented to transparently use Internal API, Custom XCom Backend access should be done directly from the “calling entity” however and we should provide a mechanism of passing temporary credentials from the XCom Backends so that the workers can store/retrieve XCom data directly without going through the API server. This can be implemented for example using pre-signed URLS or other mechanisms where temporary credentials to access the data are passed from the server to the worker.

Other REST APIs

The tasks running in workers will also have access to the Airflow Python Client that will use the “Task” JWT token for authentication and be configured to talk to the Airflow Webserver automatically. This will provide easy access to the functionality exposed by Airflow via API to Operator developers and DAG writers, who will have standardized access to Airflow via the client.

Non-DB code that should also be isolated (because of Secret Backends)

Config

READ-ONLY access to retrieve configuration Secret Backend access will only be done on the Internal API server. Option: Until selective authorization is implemented we might allow direct access from Workers/DagProcessor for scalability reasons even if full security might not be achievable. In this case secret credentials might leak to user code.  Also, in most “strict” deployment (see above discussion about the access of user code to authentication token), the configuration file should not be made available to the user code directly. 

Internal API calls

Those methods are not necessarily going to be mapped 1-1 into the actual API calls. It is possible that several methods might be joined together for efficiency (with some possible duplications resulting if methods are used in several places). It is important however that each of the methods constitutes a complete transaction (or several transactions), because going through APIs will make it extremely difficult to maintain transactional integrity across multiple methods.

Dag Processor Airflow Code

Function/Method

Proposed approach 

manage_slas

This method should be exposed as a separate Internal API call (in DB isolation mode). It is essentially a single, independent  transaction which also covers sending notifications (note - in case we implement custom notification engines, the code of those will be executed in the context of the Internal API server. .

update_import_errors

This is also a single-session method that can be exposed and executed fully by the Internal API server.

execute_callbacks

This method should stay within the DAGProcessor  but it should use Internal API to retrieve appropriate models (which will be API objects rather than SQLAlchemy objects): DagRun, TaskInstances, Template_context - via current Public API (extensions needed). We might need to optimize/update some of the APIs to get missing information and achieve best performance by batching retrieval of only those objects that are needed.

deactivate_missing_dags

This should be a separate, new Internal API method call that should perform a single transaction. Note that currently commit is not performed for that method, but it should  have no adverse effect to make this separate transaction.

update_import_errors

This should be a separate, new internal API method call that should perform a single transaction. Similarly as above - currently this is done in the same transaction but, there should be no adverse effects of making this separate transaction

save_dag_to_db

This is a new, “heaviest” method that should be exposed by the internal API to the DAG processor. We should be able to serialize all the information stored currently in the DagBag and send it to the Internal API server. The method should be roughly equivalent to: sync_to_db and pickling dags if set. This should be done in a single transaction as a single API call.

Triggerer Airflow Code

Case

Proposed approach 

Trigger Model

Similarly as in case of the User-Code, the Trigger class should be updated to transparently handle calls to it: bulk_fetch, clean_used, assign_unassigned, submit_event, submit_failure, ids_for_triggerer. All those methods are currently single transaction calls and can be easily mapped into API calls with serialized information. Performance wise it exchanges HTTP REST calls with a DB call which yields some unavoidable performance penalty. However only submit_failure and sumbit_event will have visible impact due to their “non-batch” nature. In case this constitutes a problem we might consider a mechanism where we also accumulate and batch those events into batch calls.


Worker Airflow Code

Case

Proposed approach 

refresh_taskinstance_from_db

Single transaction retrieving data from the DB - part of internal API..

update_task_instance

There are a few places where tasks are saved to the db in a separate transaction after task values are updated. (host/job_id). We will implement a new API endpoint for it.

defer_task

We should implement an internal API call that would create a trigger entry and update the task state. This method is a single-transaction method so it should have limited performance impact - Internal API can utilise connection pooling much better than Workers.

get_template_context

We should implement a separate Internal API call that retrieves context for the task. All fields in context should be serializable (or we can make them so) and performance of this method should not be heavily impacted as we exchange a DB call with an API call from the point of view of Worker and Internal API implementation can utilize connection pooling much better.

handle_reschedule

We should implement a separate, Internal API call that implements adding task reschedule via DB. This is a single transaction so it should be easy to replace it with a single API call.

handle_failure

Similarly as above, we should implement a single API call as this method is always followed by a commit.

add_log

The message should add a log model entry as a separate call. Similar considerations as above.

update_state

Updates task instance state. Also separate methods od in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode. - similar considerations. One watchout as there is an option to execute callbacks while running this function. Rather than executing them in this case we should return a list of callbacks to execute and execute them in the context of the worker. They will be executed after the transaction is complete and there is a small risk that callbacks would be lost in this case, however the risk is rather minimale. Option: We could consider options to store the callbacks in the database instead and delegate execution of those to DagProcessor. 

run_mini_scheduler_on_child_tasks

This should be a separate method in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode.

Implementation notes: Internal API component

Airflow Internal API provides methods only for "untrusted" components (Worker/Dag Processor/Triggerer), as Scheduler will use only direct DB access.

While the APIs could be exposed also via webserver, we propose to add a new, optional, stateless (thus horizontally scalable) component that can be started using a new command line command: airflow internal-api.

Code is located in airflow.api_internal package, with all endpoint's methods available via in /api_internal/ endpoint. 

As mentioned above the “Internal API” is not an “API-first” but “Code first”. The implementation details on how we map particular methods into endpoints is going to be worked out in detail at the POC that follows the AIP, but we attempt to approach the implementation in the way that will minimize impact on the existing code (at most slight refactoring and splitting existing methods into several or joining them). All the methods exposed via API will have to be adapted to make their parameters and return values serializable (so in some cases we will have to perform conversion between the objects that are big/not serializable into appropriate representation needed by the method (for example DagBag passed in a few methods above and example below should be turned in list of ids and parameters needed by the methods). All the methods involved will have to be converted into “functions” so that no instance objects are accessed.

Another important factor is future maintainability - in the implementation we will avoid code duplication by implementing decorators that will either use the locally implemented DB access or delegated automatically to a corresponding API call. The current “@provide_session” decorator will provide an “Assert” session in Workers or DagProcesor in the DB Isolation mode, so any attempt of accessing the DB from those will raise an exception.

In order to achieve maintainability, we want to make sure that all the communication - both in case of DB isolation mode and in case of "DB" mode for those methods is executed using  RPC (Remote Procedure Calls). Even in case of "no isolation mode", the methods from the inventory above will always be executed via "internal RPC server". Modern RPC solutions provide a way to run a local server for RPC connections, which provide a much more performant version of the RPC methods - without authentication, security, communication overhead., while using the same code and mechanims to execute those.  

The short research of the RPC implementations available  suggest the following popular and proven implementations we could use:

  • XMLRPC: part of Python standard library 
  • gRPC: Google-originated Apache 2.0 licensed implemetnation (based on binary Protocol Buffers)
  • Apache Thrift : Facebook-originated (now Apache) RPC using binary Thrift protocol

Out of the protocols. XMLRPC is quite simple and has quite a big (XML-induced) overhead for serializing/deserializing/communication, where both gRPC and Thrift use efficient, binary, extendable protocols that introduce very low overhead on serializing/deserializing and communication. Whille we are not making the final decision yet, the firs step of the project will be to analyze, implement POC and perform necessary benchmarking to decide which of the two implementation (GRPC or Apache Thrift) to use. We are also reaching out to Apache Beam team who has experience with both protocols and can help us with the analysis they've done on the capabilities of those protocols in the past.

Both implementations are similar in the sense that we need to define description of the RPC methods we are going to have (either Protobuf or Thrift) and compiler will generate the Python client and server code. Both implementation have full support for HTTPS communication including various authentication options (extendable) and both can communicate via TCP/UnixSockets for local implementation. Apache Thrift has also "shared-memory" mode where the communication between "client" and "server" can be done over shared memory. 

During initial POC we performed a query among some experiences of other Apache Projects (ApacheBeam) and we chose gRPC . However we reserve the possibilty of swapping out the gRPC with pure JSON/Open API based approach. The POC will continue, while we are trying out and testing the concerns raised during the discussion - multi-threading vs. multiprocessing approach, loadbalancing and capabilities of monitoring such requests have been raised - see https://lists.apache.org/thread/1pw2ohdyjtw1nj7ckyofq70sv8onxqdd. The "Internal API" abstraction will allow to swap them out and we might even decide to keep both options if we find they fare better in different scenarios.

Below example show how we could like define "process_file" method with GRPC protobuf definition (this is a small part of the final definition needed and not neccessary compiles but should give an idea). You can also see the POC PR implementing gRPC method calls used for performance testing

The example is updated with "clearer" approach proposed by Ash Berlin-Taylor  to add an extra level of "Internal Client" Abstraction and base the decision on global settings rather than object's method (see https://lists.apache.org/thread/4wmftmjfqhwc4h6r87tn636lkttj1hzo)

Original method
    @provide_session
    def process_file(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
        session: Session = None,
) -> Tuple[int, int]:
        self.log.info("Processing file %s for tasks to queue", file_path)

        try:
            dagbag = DagBag(file_path, include_examples=False, \
                            include_smart_sensor=False)
        except Exception:
            self.log.exception("Failed at reloading the DAG file %s", \
                              file_path)
            Stats.incr('dag_file_refresh_error', 1, 1)
            return 0, 0

        self._deactivate_missing_dags(session, dagbag, file_path)
        ...

    @provide_session
    def _deactivate_missing_dags(self, session, dagbag, file_path):
        ...
        self.log(....)
        ...
GRpc Proto definition
syntax = "proto3";

package airflow.internal_api;

message TaskInstanceKey {
  string dag_id = 1;
  string task_id = 2;
  string run_id = 3;
  int32 try_number = 4;
  int32 map_index = 5;
}

message SimpleTaskInstance {
  string dag_id = 1;
  string task_id = 2;
  string run_id = 3;
  optional float start_date = 4;
  optional float end_date = 5;
  int32 try_number = 6;
  int32 map_index = 7;
  string state = 8;
  bytes executor_config = 9;
  string pool = 10;
  string queue = 11;
  TaskInstanceKey key = 12;
  optional string run_as_user = 13;
  optional int32 priority_weight = 14;
}

message TaskCallbackRequest {
  string full_filepath = 1;
  SimpleTaskInstance task_instance = 2;
  optional bool is_failure_callback = 3;
  optional string message = 4;
}

message DagCallbackRequest {
  string full_filepath = 1;
  string dag_id = 2;
  string run_id = 3;
  optional bool is_failure_callback = 4;
  optional string message = 5;
}

message SlaCallbackRequest {
  string full_filepath = 1;
  string dag_id = 2;
  optional string message = 3;
}

message Callback {
  oneof callback_type {
      TaskCallbackRequest task_request = 1;
      DagCallbackRequest dag_request = 2;
      SlaCallbackRequest sla_request = 3;
   }
}

message FileProcessorRequest{
  string path = 1;
  repeated Callback callbacks = 2;
  bool pickle_dags = 3;
}

message FileProcessorResponse {
  int64 dagsFound = 1;
  int64 errorsFound = 2;
}

service FileProcessorService {
  rpc processFile (FileProcessorRequest) returns (FileProcessorResponse) {}
} 
New method with optional GRPC path
    def process_file_grpc(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
    ) -> Tuple[int, int]:
        request = internal_api_pb2.FileProcessorRequest(path=file_path, pickle_dags=pickle_dags)
        for callback_request in callback_requests:
            request.callbacks.append(callback_request.to_protobuf())
        res = self.stub.processFile(request)
        return res.dagsFound, res.errorsFound

    def process_file(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
    ) -> Tuple[int, int]:
		if settings.DATABASE_ACCESS_ISOLATION:
            return InternalAPIClient.dagbag_process_file(
                file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
            )
        return self.process_file_db(
            file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
        )

Performance impact


The performance impact is visible (as expected). It's quite acceptable for a distributed environment (in exchange for security), but it's likely significant enough to stay with the original idea of having Airflow work in two modes: a) secure - with RPC apis, b) standard - with direct DB calls.

On "localhost" with a local DB the performance overhead for serializing and transporting the messages between two different processes introduced up to 10% overhead. I saw an increase of time to run 500 scans of all our example folders dags going from ~290 to ~320s pretty consistently. Enough of a difference to exclude volatility. I tested it in Docker on both ARM and AMD. It seems that in some cases that can be partially  offset by slightly increased parallelism on multi-processing machines run on "bare" metal. I got consistently just 2-3% increase on my linux without docker with the same test harness, but we cannot rely on such configurations, I think focusing on Docker-based installation without any special assumptions on how your networking/localhost is implemented is something we should treat as a baseline.

The same experiment repeated with 50 messages but of much bigger size (300 Callbacks passed per message) have shown 30% performance drop. This is significant, but I believe most of our messages will be much smaller. Also the messages I chose were very special - because in case of Callback we are doing rather sophisticated serialization, because we have to account for Kubernetes objects that are potentially not serializable, so the test involved 300 messages that had to be not only "on the wire" serialized but also some parts of them had to be Airflow-JSON serialized and the whole "executor config" had to be picked/unpickled (for 100 such messages out of 300). Also we have some conditional processing there (there is a union of three types of callbacks that has to be properly deserialized). And those messages could be repeated. This is not happening for most other cases. And we can likely optimize it away in the future. but I wanted to see the "worst" scenario.

For the remote client (still local DB for the internal API server) I got between 30% and 80% slow-down, but this was on my WiFi network. I am sure with a "wired" network, it will be much better, also when a remote DB gets into picture the overall percentage overhead will be much smaller. We knew this would be a "slower" solution but this is the price for someone who wants to have security and isolation.

No tests were performed with  SSL, but this will be a modest increase. The "impact" of 10% is IMHO enough to make a decision that we cannot get "Internal but remote-only way" - the path where we will continue using Direct DB access should stay and implementation of POC shows how to do it.

Implementation and maintainability


In the PR you cabn see  implementation details and how it will impact our code in general. The proposal is rather elegant and easy to maintain, and likely we can improve it with decorators later, the proposal focuses more on explicitness and showing the mechanisms involved.

Seems that it is easy to implement and it does seem to have some good "easy maintainability" properties.

It boils down to few rules:

  • For all the data and "Messages" we sent, we have a nice "Internal API" defined in .proto and protobuf objects generated out of that.

The "API specification" nicely defines the structures we are going to send over the network. It's very standard, it has a really good modern support. For one, we automatically (with pre-commit) generate MyPY type stubs for the generated classes and it makes it super easy to both - implement the mapping and automatically verify its correctness. Mypy nicely catches all kinds of mistakes you can make! Autocomplete for the PROTO-generated classes works like a charm. During the POC implementation nice detection of mistakes provided by MyPy guided implementation to be possible (avoiding a lot of errors) way faster than without it.

  • All Airflow objects that we get to send over the wire should get from_protobuf/to_protobuf (or fromJSON/toJSON( methods (this could likely be separated from the object, especially if we decide on supporting more than one transport protocol).

Those are usually very simple (just passing strings/ints/boolean fields to the constructor), and the structures we pass are rather small (see the PR). Also few more complex cases (with more complex serialization) were implemented as part of the POC and it is easy, readable and pretty well integrated into our code. It introduces a little duplication here and there, but those objects change rarely (only when we implement big features like Dynamic Task mapping) and MyPy should guards us against any mishaps there (now that our code is mostly typed).

  • We make all "DB Aware" objects also "Serializaation aware". The number of changes in the actual code/logic is rather small and should be easy to maintain.

There are basically two changes to implement for any method in the above inventory:

    1) For any of the objects that are used for database operations (in the POC PR this is DagFileProcessor) we need to use a global configuration flag to decide how to run it and in case remote communication is used, we pass the serialized objects via the channel that will be used for communication. 
    2) The initial method has to be renamed and original method has to be replaced with small snipped to make a direct-DB or Remote call (see above)

Deployment options

Option 1: No isolation (today):

[core]database_access_isolation=False (default)

All components have direct access to the Airflow database as they do right now. No change/impact on Airflow execution.


Option 2: Internal API as part of Web server

This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as passing additional parameter --with-internal-api to the airflow webserver command.

In this mode Internal API is executed as part of Airflow API embedded into the web server, which allows saving resources in small installations.


Option 3: Internal API as external component

This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as starting additional component with `airflow internal-api`

In this mode Internal API is executed as a standalone component, using the same code as Web Server API but exposing only parts of the endpoints (Internal-API related). This mode allows scaling independently (e.g. on Kubernetes using HPA)


Alternative approach: DbSession class extending sqlalchemy.Session

To make the transition transparent for users, I proposed to implement changes at the lowest possible level - SQLAlchemy Session

Sessions are created in settings and which are then used by create_session method. It is then used to make queries to the Database with SQL Alchemy, e.g.:

session.query(DagRun)
  .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
  .first()


To make it support DbIsolation mode, without any change in the user code -  the Session object must be extended - and it is by providing a subclass DBSession
It supports both "standard" and "DBIsolation" modes, overriding the Session class methods, example for query: (just a code snippet)

class DBSession(Session):

  def __init__(
      self,
      # Session parameters
      bind=None,
      autoflush=True,
      autocommit=False,
      expire_on_commit=True,
      info=None,
     # New parameter
      db_isolation_mode=False):

    super().__init__(
      bind=bind,
      autoflush=autoflush,
      expire_on_commit=expire_on_commit,
      autocommit=autocommit,
      info=info)

    self.db_isolation_mode = db_isolation_mode
    self.db_api_client = None
    if(db_isolation_mode):
      # Client for Airflow Database API
      self.db_api_client = DbApiClient() 

  def query(self, *entities, **kwargs):
    if not self.db_isolation_mode:
      return super().query(*entities, **kwargs)
    return DbApiClientQuery(entities=entities, db_api_client=self.db_api_client)


All other public methods must also be overridden in DBSession and either call proper super class methods or provide "db_isolation_mode" implementation - even if it's just raising an exception.

If db_isolation_mode is False then the DbSession works exactly the same way as the Session.

In db_isolation_mode = True it returns new object type: DbApiClientQuery that sends requests to Airflow Database API instead of SQL database:

class DbApiClientQuery:

  def __init__(self, entities, db_api_client):
    self._entities=entities
    self._db_api_client = db_api_client
    self._filters=[]

  def filter(self, *criterion):
    for c in criterion:
      columnId= c.left.description
      value= c.right.effective_value
      self._filters.append((columnId,value))
    return self

  def all(self):
    return self._call_api_query_method(DbQueryMode.ALL)

  def first(self):
    return self._call_api_query_method(DbQueryMode.FIRST)

  def one_or_none(self):
    return self._call_api_query_method(DbQueryMode.ONE_OR_NONE)

  def _call_api_query_method(self, query_mode):
    kwargs={}
    for (columnId,value) in self._filters:
      kwargs[columnId]=value
    method = self._get_query_method_name()
    return getattr(self.db_api_client, method)(
      query_mode = query_mode,
      **kwargs)

  def _get_query_method_name(self):
    if self._entities[0]==DagRun.__name__:
      return 'get_dag_runs'
    raise NotImplementedError()

class DbQueryMode(Enum):
    ALL = 'all'
    FIRST = 'first'
    ONE_OR_NON ='one_or_none'

Note: It assumes the filter contains only simple conditions with AND operator.

DbApiClient must have get_dag_runs which calls the proper Airflow Database API method with provided optional parameters (as query strings).

DB Data mutations 

Mutations are implemented similarly to query, by overriding "delete" or "update" and mapping the request to the proper Airflow Database API method. 

For "add" it sends the serialized object to Airflow Database API where it is deserialized and added to the DB.

Complex statements

Not all statements can be covered with the approach described above, e.g. In processor.py there is:

qry = (
  session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
  .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
  .filter(TI.dag_id == dag.dag_id)
  .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
  .filter(TI.task_id.in_(dag.task_ids))
  .group_by(TI.task_id)
  .subquery('sq')
)

max_tis: List[TI] = (
  session.query(TI)
  .filter(
      TI.dag_id == dag.dag_id,
      TI.task_id == qry.c.task_id,
      TI.execution_date == qry.c.max_ti,
  )
  .all()
)

Instead of covering all cases in the DbSession filter method, the DbSession class has a dedicated method for this case.

The code in processor.py simply calls this method, and its implementation covers both cases: DBDirect(using SQLALchemy and direct Database call) and DBIsolation mode - calling a dedicated Airflow Database APi endpoint (which calls the same DbSession method, but in DBDirect mode)

e.g.


class DBSession(Session):
  ...
  def get_max_tis(self, dag_id,task_ids):
    if self.db_isolation_mode:
      self.db_api_client.get_max_tis(dag_id, task_ids)
    else:
      qry = (
        session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
        .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
        .filter(TI.dag_id == dag.dag_id)
        .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
        .filter(TI.task_id.in_(dag.task_ids))
        .group_by(TI.task_id)
        .subquery('sq')
    )
    return (
        session.query(TI)
        .filter(
            TI.dag_id == dag.dag_id,
            TI.task_id == qry.c.task_id,
            TI.execution_date == qry.c.max_ti,
        )
        .all()
    )

Configuration

Together with Airflow Database API the following configuration setting are introduced:

  • [core] database_access_isolation

Whether the DBIsolation mode is enabled and untrusted components should use Airflow Database API for accessing Airflow Database

  • [core] database_api_host

Url for Airflow Database API. Used only if [core]database_access_isolation is True

Dag processor, worker and triggerer

All tests for DagProcessor, Workers (including Operators) and triggers are executed in both modes "DBDirect'' and "DBIsolation ''. The code for Airflow Database API, the DbApiClient and DBSession are extended until all tests pass.

Why is it needed?

This change allows Workers, DagProcessors and Trigerrer to work in untrusted mode - without direct access to Database.

Which users are affected by the change? 

Only those that set [core]database_access_isolation=True

How are users affected by the change? (e.g. DB upgrade required?)

What defines this AIP as "done"?

It is possible to start an Airflow Database APi from Airflow CLI and it is easily extensible with new methods. Workers, Dag Processors  and Trigerrer  can work in DbIsolation mode - without direct access to DB.

28 Comments

  1. You haven't explained how the code will look like to support both with and without this proposal.

    1. It's just 'self.is_db_isolation' which is always false if the mode is not enabled

  2. I do not like having the DB API server and the existing Rest API as separate components:

    • The permissions model is nearly granular enough to handle the "service account" (i.e. scheduler issues tokens that only allow access to update a given task.
    • There is a lot of overlap between the APIs in functionality (and we'll need to add more endpoints to the REST API as we expand out the React UI to use the API)  and I do not like the idea of having to maintain three versions of everything: One in the API, one in this new DB API, and a third in the without-db isolation.
    1. I see your point. We had some discussion about whether to define new API or try to integrate it into existing. While we don't have any strong option, the one problem is that existing is embeded into Web Server - so we may need to separate it to be able to scale it independently etc. But maybe it's the right thing to do.

  3. I think we all want to achieve the same, but maybe it needs more explanation and a bit more implementation details. 

    I believe we can (and possibly we should) use the same "connexion_api" endpoints implemented that we have currently and only add a new authentication scheme. Simply run the same endpoints, but plug-in different authentication (based on scheduler-issued tokens and not Base Auth or others). And surely we SHOULD use the already existing API implementation of say "get variable" or "set variable" method. I think there is no doubt about it.

    What the "separate component" is about is not to rewrite new APIs for the things we already have but to run and expose (including authentication)  differently (and allow to run multiple DB API components as well).

    • For one I am quite sure we should not run "full webserver" to handle those API calls. It's not needed . And likely in that separate components we should only expose the APIs that we will need by Workers and DagProcessor.
    • Plus we will have a few APIS that should never "make it" to the API available for regular users - for example equivalent of "_run_mini_scheduler_on_child_tasks" should only be "triggerable" from worker.
    • The API will be usually placed in different security zone anyway - not accessible by "users" usually, so we really do not need to import/instantiate all the unnecessary components for such DB API component


    That really boils down to a new `airlfow db-api` or similar component (and we should be able to run multiple of those if needed - for scalability - those components similarly as webserver will be stateless)


    We can approach it in two ways:

    1) add new APIs that are needed only for the Workers/DagProcessor to the existing OpenAPI specification and simply forbid them from running from other clients

    2) create a new "specification" with some overlap which will reuse  the same implementation of the "common APIs" and expose the "Worker/DagProcessor" APIs additionally 


    Both have pros and cons (and maybe with OpenAPI the second one will not even be possible and we will have to implement or use existing ways of filtering available APIs in connexion. 


    But I think we are virtually talking about the same, and it's just misunderstanding wha "Separate API means"

  4. Also - maybe that was not clear enough (Mateusz Henc  maybe you could also clarify this). That conceptually we are not going to "change" existing implementation of database operations. For example "_run_mini_scheduler_on_child_tasks"  which is done as single DB transaction will be executed inside the DB-API component as a single API call - such API call is a good example of calls that would not "fit" in the current "Open" API - because we will never expose it to "end user" (this is one of the reasons why the set of APIs for "Workers/File Processors" should be different than the one available for "users" (with quite an overlap indeed - for the "primitive" calls like get variables, but not for the "internal" logic calls).


  5. Hi Folks,

    This is a fantastic initiative that will be a great benefit to all enterprise Airflow users.  Would it be possible to extend the comment time until mid-January to allow those who are out for holidays to review and weigh in?

    Thanks!

  6. Great initiative and a good step towards multi-tenant Airflow! I would like to stress that my intention with AIP-1 is to enable full autonomous running of tasks without needing the scheduler at runtime. This means that all details required for the Task to run should be supplied to either the Task when it starts running (e.g. via STDIN/unixsocket) or via the TaskRunner which then communicates with the Task. The Task should only communicate limited states to the TaskRunner (SUCCESS, FAILED) by means of OS exit codes. This enables not only confidentiality but also more availability and integrity. Availability as you would be able to update the scheduler/api without any downtime of the tasks and no required backoff mechanism in the api-client inside the task. Integrity as we now have a trusted process (TaskRunner) that can communicate with the scheduler and an immutable state of the variables and connections (e.g. they can't change at runtime of the task) which can be further supplemented by signing / checksumming the Task to ensure what the scheduler is scheduling is actually running. 

    In other words, there is no need for TWO WAY communication between a Task and the Scheduler. 

    I understand that this AIP is written with backwards compatibility in mind and it is a good first step! I do think that the above with the TaskRunner doing the communication on behalf of the Task does not have to come at the cost of backwards compatibility.

    1. Yep. First step it is. Thanks for kind words Bolke de Bruin   (smile)

      The more I talk to different people the more I see how this is the beginning of the journey rather than even the middle of it. We are barely scratching the surface of what multitenancy will look like in the future.

  7. This would be a great enterprise feature (re MWAA) as it reduces the harm users can possibly do to the meta db from their dags.

    For what it's worth I'm in favour of option 3 (Internal API hosted as it's own component). The hoz scalability and isolation from webserver are both desirable, the last thing we need is the WS being bogged down by being inundated with API requests from environments with large/complex dags/tasks.

  8. Would it be possible to run the internal API on a different port than the REST API (or at least make the internal API port configurable)? This would make it a lot easier to configure firewalls to block internal API access while permitting REST API access. 

  9. An idea: even if the API server is "in" the webserver, re-writing it to send all DB access via the (in process) internal API could make sense from a code cleanly-ness point of view – essentially that removes DB access from everything but the Scheduler and the Internal API (code)

    1. Hmm. Interesting idea indeed. I actually quite like it when you mentioned it.

      This might simplify the approach quite a bit and it's not very far from the proposal. This might be technically quite a bit different - like no decorator, but explicit (internal API) calls. But it does sound interesting.  Let me do some research on some possible API implementations, I am pretty sure some of the possible solutions (GRPC ? )  out there might already have a good solution that you can switch between local and remote APIs - so that we will not have to reinvent the wheel.

  10. Hm. i focused on the "in-process" (smile) not the webserver - so I misunderstood it, but I think It gave me some interesting Idea to explore. 

    But Yeah - I think we could rewrite Webserver eventually to use internal API (probably not in this AIP - but a separate one)

  11. Jarek Potiuk How these internal APIs will look like? Will it be similar api_connexion but without authentication? Also, we have custom plugins that access the database, I guess these will still be valid since custom plugins are deployed in the webserver?

    1. It's updated not Omar Al-Safi  (sorry - I missed your question before) - yeah. Webserver Plugins will continue using the DB directly (and now the doc has all the implementation details and POC). There will be authentication and i am proposing gRPC API.

  12. Cross posting part of my comment from AIP-56.

    • Do we plan to block DB access to all untrusted components ? If we do not do this, we will need to handle authorization at 2 levels: internal API and DB, with problems like out of sync, maintenance burden etc. I understood from this API that the answer might be yes but I would like to get confirmation
    • At least in new code we should probably park the logic of DB or API access within the internal API client OR in some kind of abstracted, centralized data access layer. Making this part of the code as the two examples of this AIP makes the code harder to maintain in my opinion. With an abstract interface for data access, it goes along with SOLID principles and makes the code much more secure an maintainable if we decide to modify protocols, DB access framework etc., because the changes would rely in the Data Access layer and not in the features logic. The less we touch the features logic the less we can break it. cc Ash Berlin-Taylor because you suggested the "if" approach within the code and Jarek Potiuk because I assume it was you to come up with the decorator within the feature code, sorry if I got it wrong. With a data access layer we could go with any of these approaches and/or modify them easily if necessary.
      • Do we plan to block DB access to all untrusted components ? If we do not do this, we will need to handle authorization at 2 levels: internal API and DB, with problems like out of sync, maintenance burden etc. I understood from this API that the answer might be yes but I would like to get confirmation:

        Yes.

        * At least in new code we should probably park the logic of DB or API access within the internal API client OR in some kind of abstracted, centralized data access layer

        Not sure if I understand what you mean. Example would be useful.

        Rewriting whole Airflow to use some kind of centralized data layer would be a huge undertaking. If I understand what you mean - there are a number of places where airflow internally heavily relies on passing and retrieving SQLAlchemy objects. So what we are effectively doing is adding "abstract data layer" where only "untrusted" components will use a subset of methods they use, but the "trusted" ones will continue to use the DB Acccess. This is an approach which seems the most feasible without imposing a huge risk of breaking the behaviour of trusted components.

        In fact the proposal with decorators is going to end up with a complete set of methods that will be "untrusted component" data layer. - i.e. methods that are used by untrusted components that expose the internal API. Unless of course I understand it wrongly. This will eventually become a complete set of the "untrusted components abstract data latyer": https://github.com/apache/airflow/blob/main/airflow/api_internal/endpoints/rpc_api_endpoint.py. Do you think about using yet another abstractions internally for Scheduler as well for example? Do you think scheduler should internally use some other Data Access layer? Why would we need it for scheduler-only methods ?
  13. DB access point: great. Is it already implemented or is there still any backdoor in which the user could get direct access to the DB and skip any authorization check ?

    Not sure if I understand what you mean. Example would be useful. 

    Actually this might just be what Ash Berlin-Taylor suggested, maybe I misunderstood the example:


    def process_file(
        self,
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
    ) -> Tuple[int, int]:
        if settings.DATABASE_ACCESS_ISOLATION:
            return InternalAPIClient.dagbag_process_file(
                file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
            )
        return self.process_file_db(
            file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
        )

    If this method is not defined in the airflow core code but only an abstract internal layer (or in the internal API client logic but then the above function should directly call the rpc method instead of InternalAPIClient.dagbag_process_file), then it is what I meant and it does not matter if it is implemented through decorator or if/else. Airflow core, untrusted code should only rely on this process file method, without knowing how it is really accessed. I initially understood that such piece of code would be directly part of Airflow core code, which is not the right place in my opinion.

    Also important point is that I do not expect all existing code to use this pattern right away, and for now the decorator can still be useful until the code is gradually refactored (if ever (wink)), but any newly written code should use this new pattern.

    I hope it clarifies the situation, if not, please let me know what is not clear and I will try to do even better (smile)


  14.  if settings.DATABASE_ACCESS_ISOLATION:
            return InternalAPIClient.dagbag_process_file(
                file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
            )
        return self.process_file_db(
            file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags
        )


    This is exactly what @internal_api  decorator does.

    1. Yes but the decorator is applied within the core code of Airflow (which is OK for existing implementation because of the amount of refactoring work involved, but not for new code), which makes the core code either DB or API aware. Ideally we should have an abstract method which hides the data access logic and keep the core code focused on the "what" and not the "how", like "get_user", "update_role" etc.

      1. I am confused. What are you asking about? Why ideally?  What's the difference. I completely don't understand the rationale behind it, but maybe you can explain on another exaemple.

        I think what you are asking for is about something completely diffferent, i.e. changing internally how Airflow architecture, classes and abstraction layers are defined.
        But this is  quite unrelated to AIP-44. And if you have good idea how to do it, I invite you to writing an AIP about it with some good examples and explanation why you think we need to re-archtitect how Airflow internal classes and abstraction layers are implemented. Changing airflow internal architecture is not a gaal of AIP-44 and will never be. Because we consider it too risky, this is why we chose the more "surgical" approach where we chose to rather use what we have than propose big changes in the architecture. But if you think it is a good idea - feel free (just not in this AIP).

        Unless I am mistaken of course.

        1. Concrete example is this:
          https://github.com/apache/airflow/blob/eaaa030ab4dc4d178dab8194fd4ef9450b6b67a5/airflow/dag_processing/processor.py#L416

          In the manage SLAs function I see a mix of code logic (SLA handling) and data retrieval from the database to be able to execute this logic. From what I see the decorator is applied on the whole method, maybe because it was simpler for refactoring (which is understandable) or maybe for performance reason (since there are more than one DB call, less data to transfer back and forth)

          What I would suggested (not necessarily as part of this AIP because it is too late now) is that any new code should follow a pattern which separate the concerns: the code logic should be executed as little as possible in the internal API (in this case), and only the necessary data are retrieved through the internal API.

          For instance

                  qry = (
                      session.query(TI.task_id, func.max(DR.execution_date).label("max_ti"))
                      .join(TI.dag_run)
                      .filter(TI.dag_id == dag.dag_id)
                      .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
                      .filter(TI.task_id.in_(dag.task_ids))
                      .group_by(TI.task_id)
                      .subquery("sq")
                ) ...

          Could be changed to something like (please do not take into account namings)

                 task_instance.get_latest_tis_for_dag(...) or data_access_layer.get_latest_tis_for_dag(...)


          This method would lie for instance in the task_instance module or elsewhere if dealing with multiple objects is not acceptable within a given model.

          It would add more decoupling between code logic and data storage layer making the code lighter and simpler to understand + it gives the freedom to have and change how we want the data access layer code (decorator or not etc.). Such maintenance would be safe because we do not modify code logic but only data retrieval.

          I do think that it has a link with AIP-44, while I know it is too late to change existing requirements (and it is not my aim here). Please let me know if you think it is a good idea nevertheless and what would be the next step in this scenario (amendment to this AIP since it does not impact existing implementation, only improvement for future ones? I can also raise a Github issue).

          Hope it was clear and concrete enough (smile)

          1. there is absolutely no problem in introducing "get_latest_tis_for_dags" method and decorating it with @internal_api  decorator if you think that it will make the code better. Just don't call it the data access layer. Simply refactor the method and decorate it with the decorator (and remove the previous one). this will do exactly what you wanted to get. PRs on that are most welcome. Anyone can contribute to make our code better and such change might be a good idea. I am arguing that having a separate "data_acess_layer" as an abstraction and "name" is not needed. The "internal_api" decorators ARE the data access layer. They are groupped in one place and they are doing what you want to do but the if is inside the decorator.

            1. My point here is that the decorator should only be applied to (new) data access operations, which is why I called it "data access layer". I do not fully agree on your sentence 'The "internal_api" decorators ARE the data access layer.' for two reasons:

              • Here it was applied to a whole method including logic so it goes beyond the simple "data access layer" concept
              • If we apply this decorator on methods including logic which means:
                • We can still see DB related code, which is implementation details not relevant to the method logic (so against one of the SOLID principle)
                • The usage of the decorator itself at that level is leaking implementation details as well (breaking the same SOLID principle). Accessing a method like get_latest_tis_for_dags keeps the code focused on the what and not the how (EDIT: the decorator would be specified in this get_latest_tis_for_dags method for instance).

              The "how" is maintained completely separately and changing its implementation does not affect code logic areas.

              My suggestion is that for any new development (like AIP-56) then we could follow this new pattern which would make the code more readable and maintainable. Existing code refactoring will only be best effort - I am completely fine with that. I would be glad to raise a consensus or vote on this in the mailing thread + make a PR to document the new standard if you think it can be a good idea.

  15. > DB access point: great. Is it already implemented or is there still any backdoor in which the user could get direct access to the DB and skip any authorization check ?

    AIP-44 is being implemented. Once it is implemented, the untrusted components should be able to work without having access to Airlfow metastore  DB credentials, which means that if deployed correctly (without the DB credentials configured) they will have no physical possibiity of connecting to the DB because they will not know how to login to it.

    1. Thanks for the clarification