Status
Motivation
This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API.
Airflow Database APi is a new independent component of Airflow. It allows isolating some components (Worker, DagProcessor and Triggerer) from direct access to DB.
Note: Access control to Airflow Database API & Airflow Database rows is out of scope for this AIP, but it was kept in mind, making the implementation feasible.
Proposal
Internal API architecture
Airflow Internal API is logically a new API, independent of existing User-facing API embedded into Airflow Webserver:
- They have different purposes: user-faced vs internal
- They have different access model - end user credentials(roles) vs task-based access
- They are different styles of APIS: REST vs. RPC-like (HTTP Based)
- The REST API is user facing and it is API-first (code reflects the API specification) where in case of Internal API the API should be code-first (the API reflects methods to be called)
The way how we approach implementation is still to be decided at the POC stage but we want to seamlessly integrate authentication between the two APIs. It should be possible to authenticate with both APIs using the same mechanism, and it should be possible to expose both APIs by the same web server .
The REST API and Internal API could be exposed together in a single process (webserver) but optionally they could be exposed separately in different processes. The Internal/REST API separation can be introduced in two ways:
- Internal APIs exposed together with REST API via Webserver with RBAC access control
- We will have a separate “Internal API Role”, and “Internal API user” within the access model that we use from FAB. You can only authenticate with the user, using the “Internal” authentication method: based on tokens
- The authorization of Workers should be based on validity of JWT tokens generated when Scheduler. Dag Processor and Triggerer will have a secret that will be available only to them, not to the user code they run (see below the section on “Securing access to authorization token in Dag Processor and Triggerer).The user code in the Worker should not have access to any long term secret - they should only use limited-time valid JWT Token.
- This “Internal API Role” will have only permissions that will allow it to execute the Internal APIs (RPC-styled) that are needed to provide user-code with current functionality (Connections/Variables etc.) in a transparent way. Selected (possibly even all) other API methods from existing REST API that should be exposed to the Workers, Dag Processor and Triggerer. We will make an inventory and make a decision on a “default” permissions for REST APIs during the implementation. This will be done using the same RBAC controls that we use in current REST API implementation.
- Physical separation of the components exposing both APIs:
- The “Internal API” optionally can also be exposed by a component separate from the current webserver.
- We also propose to implement a separate, optional “airflow internal-api” component, that could expose only the APIs needed by Workers, DAGProcessor and Triggerer. This component can be run in a separate “security zone”, allowing to separate access to the “Internal API” on a physical/networking level
The Internal API calls are “RPC”-style APIs rather than CRUD APIS so they do not map into the REST semantics. The Internal endpoints do not need to be published in the API specification so we plan to generate the APIs based on the code (decorated functions). As the next step (implementing POC) we will determine if we should generate the endpoints from the code via completely custom code, or whether we will be able to leverage existing libraries and tools (for example Fast API)..
Later phase of “Umbrella” AIP-1 implementation (not implemented in this AIP) is to implement task-based authorization (each task should have access to a subset of operations depending on dag_id, task id, origin, etc.). While this AIP does not address it, it makes it possible.
In the future, the JWT token can be used to carry additional (signed) payload which will be used to selectively authorize the caller to specific entities (connections, variables, specific tasks or dags only etc.)
No access to Metadata DB for “User code” in DagProcessor and Triggere
Currently, SQLAlchemy Models of Airflow are available also for the top-level user code from DAGs and callbacks. Essentially users can access Airflow Variables, Connections and other DB models while the DAG code is parsed or when callbacks are executed. This is however considered a bad practice for Top Level code, also using DB access for callbacks (TODO discuss it) is not recommended practice. It interferes with the parsing process and it can be replaced by other mechanisms.
Similarly any DB acces in User Code in Triggerer is a very bad practice, because the database of Airflow is not available for async.io operations. Async.io SQLAlchemy queries are Beta functionality and only work with the asynchronous Postgres, experimental driver, so for all practical purposes any Trigger user-provided code should not access the metadata DB.
The proposal for the DB Isolation mode is that the Top-Level Python code and callbacks executed by the DAGProcessor are not allowed to access Airflow Database at all.
This could be achieved by not exposing an SQLAlchemy session to the process that executes DAG Parsing and callbacks. While this does not provide complete isolation for malicious users (DAG Parsing/Callbacks will be executed as sub-processes of DAGProcessor and full sandboxing of the parsing process is out of the scope for now), this could be a handy mechanism to improve performance and enforce best practices for DAG Top-Level Process parsing and callbacks.
This also means that all the Community operators should not use any DB operations in their constructors. This is similarly considered as bad practice, and enforcing that should be great side effect of introducing the DB Isolation mode. As part of the implementation, all such operators that are currently using DB operations will be identified and fixed.
Securing access to authorization token in DagProcessor and Triggerer
Another (security related) consequence of “no-access” for the user code by DagProcessor and Triggerer is that the user code run by DagProcessor and Triggerer should have no access to the authentication information to Internal API. Both components will have to have a long-living token that will allow them to authenticate and later stage authorize to run selected internal API methods, and this token should not be made available to user code. By default - in both cases - executing of the user code happens in a separate process and the token saved in memory might be removed while forking or not passed when the processes are spawned, however the token (or at least token that allows the DagProcessor and Triggerer to retrieve the actual token to use) must be stored locally on the server where DagProcessor and Triggerer are run - to handle cases like restarts without manual intervention. That puts special requirements on the parsing and event loop processes started by DagProcessor and Triggerer respectively. The following options should be implemented (TODO: discuss this):
- The configuration file where the token (or configuration of long-living token to retrieve it) should not be “world readable” and it should be readable only by the user running DagProcessor/Trigger. In the most “strict” mode, we can perform the check at the startup and refuse Airflow to start if the configuration file has wrong permissions
- Parsing/Event loop processes might be started as “no-user” processes . The consequence of this is that DagProcessor and Triggerer should be run in impersonation mode, where the user running Airflow should have “sudo” capability. This provides pretty good isolation, however requires careful configuration of the deployment in order to fully secure the authentication/authorization information.
- The Parsing/Even loop processes might be run as containers (there are separate discussions about it related to AIP-46 Runtime isolation for airflow tasks and dag parsing where the configuration of Airflow will not be mapped into the container. That provides greater isolation and security level and should only be considered in certain deployments where Docker engine is available
Test harness
As part of the implementation, we want to implement a test harness tapping into the existing CI/test environment of Apache Airflow, where the above assumptions can be automatically verified for all the community provided code:
We will run all CI Test Suite in “DB Isolation Mode” - where the unit tests will get a modified SQLAlchemy Session that will catch incorrect uses of the Database:
- Using disallowed DB operations in DagProcessor
- Using any DB access in any of the example DAG’s top-level code
- Using any DB Access in __init__() methods of any of the Community Operators
- Using disallowed models in the Community Hooks
- Using disallowed DB operations in "execute()" calls of the operators
- Testing scenarios in which transaction succeeds but the connection problems cause the "success" is not propagated to client
Internal API inventory
The Internal API serves three components: Dag Processor, Workers, Triggerer. This table shows an inventory of impacted methods, their applicability and our approach for the methods.
Note that Triggerer, asynchronous, user provided code, almost by definition should not perform any DB calls synchronously (and this is the only possibility to access Airflow Code) . This is considered as bad practice and we believe not allowing Airflow DB access (and not providing the equivalent API) from the user provided code by the triggerer might improve the performance and characteristics of Triggerer. Any attempt to use an SQLAlchemy/ direct DB access by the user-provided code should raise an exception - to be verified during POC.
User code method inventory
Those methods are available to User code that runs in Workers as part of task execution. We can easily utilize existing REST API to retrieve those. Additionally users will have access to pre-configured Python Client (installed as Airflow Dependency) that will enable Operator developers and DAG writers to access all the exposed REST APIs of airflow in a straightforward way. Note that users will also have to access config which is not a DB access, however access to Config should also be done via REST API - this allows to secure access to Secret Backends - no user code will have access to the secret which is used to authenticate with Secret Backends.
Case | Proposed approach |
Connection | Only READ-ONLY BaseHook.get_connection() will be implemented to transparently read Connection from Database or from Secrets. Secret access will also be done using the Internal API Server. Currently the connection performs a database roundtrip from all components. Replacing it with REST API call will make it slightly slower but it will also leverage SQLAlchemy Connection Pool by the Internal API Server. Connection retrieval is infrequent operation in general |
Variable | READ-WRITE Variable.get/set/update/clear methods will be implemented transparently using APIs. Similar performance consideration as in case of Connection. |
XCom | READ-WRITE all of the operation (get*/clear*) will be implemented to transparently use Internal API, Custom XCom Backend access should be done directly from the “calling entity” however and we should provide a mechanism of passing temporary credentials from the XCom Backends so that the workers can store/retrieve XCom data directly without going through the API server. This can be implemented for example using pre-signed URLS or other mechanisms where temporary credentials to access the data are passed from the server to the worker. |
Other REST APIs | The tasks running in workers will also have access to the Airflow Python Client that will use the “Task” JWT token for authentication and be configured to talk to the Airflow Webserver automatically. This will provide easy access to the functionality exposed by Airflow via API to Operator developers and DAG writers, who will have standardized access to Airflow via the client. |
Non-DB code that should also be isolated (because of Secret Backends) | |
Config | READ-ONLY access to retrieve configuration Secret Backend access will only be done on the Internal API server. Option: Until selective authorization is implemented we might allow direct access from Workers/DagProcessor for scalability reasons even if full security might not be achievable. In this case secret credentials might leak to user code. Also, in most “strict” deployment (see above discussion about the access of user code to authentication token), the configuration file should not be made available to the user code directly. |
Internal API calls
Those methods are not necessarily going to be mapped 1-1 into the actual API calls. It is possible that several methods might be joined together for efficiency (with some possible duplications resulting if methods are used in several places). It is important however that each of the methods constitutes a complete transaction (or several transactions), because going through APIs will make it extremely difficult to maintain transactional integrity across multiple methods.
Dag Processor Airflow Code
Function/Method | Proposed approach |
manage_slas | This method should be exposed as a separate Internal API call (in DB isolation mode). It is essentially a single, independent transaction which also covers sending notifications (note - in case we implement custom notification engines, the code of those will be executed in the context of the Internal API server. . |
update_import_errors | This is also a single-session method that can be exposed and executed fully by the Internal API server. |
execute_callbacks | This method should stay within the DAGProcessor but it should use Internal API to retrieve appropriate models (which will be API objects rather than SQLAlchemy objects): DagRun, TaskInstances, Template_context - via current Public API (extensions needed). We might need to optimize/update some of the APIs to get missing information and achieve best performance by batching retrieval of only those objects that are needed. |
deactivate_missing_dags | This should be a separate, new Internal API method call that should perform a single transaction. Note that currently commit is not performed for that method, but it should have no adverse effect to make this separate transaction. |
update_import_errors | This should be a separate, new internal API method call that should perform a single transaction. Similarly as above - currently this is done in the same transaction but, there should be no adverse effects of making this separate transaction |
save_dag_to_db | This is a new, “heaviest” method that should be exposed by the internal API to the DAG processor. We should be able to serialize all the information stored currently in the DagBag and send it to the Internal API server. The method should be roughly equivalent to: sync_to_db and pickling dags if set. This should be done in a single transaction as a single API call. |
Triggerer Airflow Code
Case | Proposed approach |
Trigger Model | Similarly as in case of the User-Code, the Trigger class should be updated to transparently handle calls to it: bulk_fetch, clean_used, assign_unassigned, submit_event, submit_failure, ids_for_triggerer. All those methods are currently single transaction calls and can be easily mapped into API calls with serialized information. Performance wise it exchanges HTTP REST calls with a DB call which yields some unavoidable performance penalty. However only submit_failure and sumbit_event will have visible impact due to their “non-batch” nature. In case this constitutes a problem we might consider a mechanism where we also accumulate and batch those events into batch calls. |
Worker Airflow Code
Case | Proposed approach |
refresh_taskinstance_from_db | Single transaction retrieving data from the DB - part of internal API.. |
update_task_instance | There are a few places where tasks are saved to the db in a separate transaction after task values are updated. (host/job_id). We will implement a new API endpoint for it. |
defer_task | We should implement an internal API call that would create a trigger entry and update the task state. This method is a single-transaction method so it should have limited performance impact - Internal API can utilise connection pooling much better than Workers. |
get_template_context | We should implement a separate Internal API call that retrieves context for the task. All fields in context should be serializable (or we can make them so) and performance of this method should not be heavily impacted as we exchange a DB call with an API call from the point of view of Worker and Internal API implementation can utilize connection pooling much better. |
handle_reschedule | We should implement a separate, Internal API call that implements adding task reschedule via DB. This is a single transaction so it should be easy to replace it with a single API call. |
handle_failure | Similarly as above, we should implement a single API call as this method is always followed by a commit. |
add_log | The message should add a log model entry as a separate call. Similar considerations as above. |
update_state | Updates task instance state. Also separate methods od in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode. - similar considerations. One watchout as there is an option to execute callbacks while running this function. Rather than executing them in this case we should return a list of callbacks to execute and execute them in the context of the worker. They will be executed after the transaction is complete and there is a small risk that callbacks would be lost in this case, however the risk is rather minimale. Option: We could consider options to store the callbacks in the database instead and delegate execution of those to DagProcessor. |
run_mini_scheduler_on_child_tasks | This should be a separate method in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode. |
Implementation notes: Internal API component
Airflow Internal API provides methods only for "untrusted" components (Worker/Dag Processor/Triggerer), as Scheduler will use only direct DB access.
While the APIs could be exposed also via webserver, we propose to add a new, optional, stateless (thus horizontally scalable) component that can be started using a new command line command: airflow internal-api.
Code is located in airflow.api_internal package, with all endpoint's methods available via in /api_internal/ endpoint.
As mentioned above the “Internal API” is not an “API-first” but “Code first”. The implementation details on how we map particular methods into endpoints is going to be worked out in detail at the POC that follows the AIP, but we attempt to approach the implementation in the way that will minimize impact on the existing code (at most slight refactoring and splitting existing methods into several or joining them). All the methods exposed via API will have to be adapted to make their parameters and return values serializable (so in some cases we will have to perform conversion between the objects that are big/not serializable into appropriate representation needed by the method (for example DagBag passed in a few methods above and example below should be turned in list of ids and parameters needed by the methods). All the methods involved will have to be converted into “functions” so that no instance objects are accessed.
Another important factor is future maintainability - in the implementation we will avoid code duplication by implementing decorators that will either use the locally implemented DB access or delegated automatically to a corresponding API call. The current “@provide_session” decorator will provide an “Assert” session in Workers or DagProcesor in the DB Isolation mode, so any attempt of accessing the DB from those will raise an exception.
In order to achieve maintainability, we want to make sure that all the communication - both in case of DB isolation mode and in case of "DB" mode for those methods is executed using RPC (Remote Procedure Calls). Even in case of "no isolation mode", the methods from the inventory above will always be executed via "internal RPC server". Modern RPC solutions provide a way to run a local server for RPC connections, which provide a much more performant version of the RPC methods - without authentication, security, communication overhead., while using the same code and mechanims to execute those.
The short research of the RPC implementations available suggest the following popular and proven implementations we could use:
- XMLRPC: part of Python standard library
- gRPC: Google-originated Apache 2.0 licensed implemetnation (based on binary Protocol Buffers)
- Apache Thrift : Facebook-originated (now Apache) RPC using binary Thrift protocol
Out of the protocols. XMLRPC is quite simple and has quite a big (XML-induced) overhead for serializing/deserializing/communication, where both gRPC and Thrift use efficient, binary, extendable protocols that introduce very low overhead on serializing/deserializing and communication. Whille we are not making the final decision yet, the firs step of the project will be to analyze, implement POC and perform necessary benchmarking to decide which of the two implementation (GRPC or Apache Thrift) to use. We are also reaching out to Apache Beam team who has experience with both protocols and can help us with the analysis they've done on the capabilities of those protocols in the past.
Both implementations are similar in the sense that we need to define description of the RPC methods we are going to have (either Protobuf or Thrift) and compiler will generate the Python client and server code. Both implementation have full support for HTTPS communication including various authentication options (extendable) and both can communicate via TCP/UnixSockets for local implementation. Apache Thrift has also "shared-memory" mode where the communication between "client" and "server" can be done over shared memory.
During initial POC we performed a query among some experiences of other Apache Projects (ApacheBeam) and we chose gRPC . However we reserve the possibilty of swapping out the gRPC with pure JSON/Open API based approach. The POC will continue, while we are trying out and testing the concerns raised during the discussion - multi-threading vs. multiprocessing approach, loadbalancing and capabilities of monitoring such requests have been raised - see https://lists.apache.org/thread/1pw2ohdyjtw1nj7ckyofq70sv8onxqdd. The "Internal API" abstraction will allow to swap them out and we might even decide to keep both options if we find they fare better in different scenarios.
Below example show how we could like define "process_file" method with GRPC protobuf definition (this is a small part of the final definition needed and not neccessary compiles but should give an idea). You can also see the POC PR implementing gRPC method calls used for performance testing
The example is updated with "clearer" approach proposed by Ash Berlin-Taylor to add an extra level of "Internal Client" Abstraction and base the decision on global settings rather than object's method (see https://lists.apache.org/thread/4wmftmjfqhwc4h6r87tn636lkttj1hzo)
@provide_session def process_file( self, file_path: str, callback_requests: List[CallbackRequest], pickle_dags: bool = False, session: Session = None, ) -> Tuple[int, int]: self.log.info("Processing file %s for tasks to queue", file_path) try: dagbag = DagBag(file_path, include_examples=False, \ include_smart_sensor=False) except Exception: self.log.exception("Failed at reloading the DAG file %s", \ file_path) Stats.incr('dag_file_refresh_error', 1, 1) return 0, 0 self._deactivate_missing_dags(session, dagbag, file_path) ... @provide_session def _deactivate_missing_dags(self, session, dagbag, file_path): ... self.log(....) ...
syntax = "proto3"; package airflow.internal_api; message TaskInstanceKey { string dag_id = 1; string task_id = 2; string run_id = 3; int32 try_number = 4; int32 map_index = 5; } message SimpleTaskInstance { string dag_id = 1; string task_id = 2; string run_id = 3; optional float start_date = 4; optional float end_date = 5; int32 try_number = 6; int32 map_index = 7; string state = 8; bytes executor_config = 9; string pool = 10; string queue = 11; TaskInstanceKey key = 12; optional string run_as_user = 13; optional int32 priority_weight = 14; } message TaskCallbackRequest { string full_filepath = 1; SimpleTaskInstance task_instance = 2; optional bool is_failure_callback = 3; optional string message = 4; } message DagCallbackRequest { string full_filepath = 1; string dag_id = 2; string run_id = 3; optional bool is_failure_callback = 4; optional string message = 5; } message SlaCallbackRequest { string full_filepath = 1; string dag_id = 2; optional string message = 3; } message Callback { oneof callback_type { TaskCallbackRequest task_request = 1; DagCallbackRequest dag_request = 2; SlaCallbackRequest sla_request = 3; } } message FileProcessorRequest{ string path = 1; repeated Callback callbacks = 2; bool pickle_dags = 3; } message FileProcessorResponse { int64 dagsFound = 1; int64 errorsFound = 2; } service FileProcessorService { rpc processFile (FileProcessorRequest) returns (FileProcessorResponse) {} }
def process_file_grpc( self, file_path: str, callback_requests: List[CallbackRequest], pickle_dags: bool = False, ) -> Tuple[int, int]: request = internal_api_pb2.FileProcessorRequest(path=file_path, pickle_dags=pickle_dags) for callback_request in callback_requests: request.callbacks.append(callback_request.to_protobuf()) res = self.stub.processFile(request) return res.dagsFound, res.errorsFound def process_file( self, file_path: str, callback_requests: List[CallbackRequest], pickle_dags: bool = False, ) -> Tuple[int, int]: if settings.DATABASE_ACCESS_ISOLATION: return InternalAPIClient.dagbag_process_file( file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags ) return self.process_file_db( file_path=file_path, callback_requests=callback_requests, pickle_dags=pickle_dags )
Performance impact
The performance impact is visible (as expected). It's quite acceptable for a distributed environment (in exchange for security), but it's likely significant enough to stay with the original idea of having Airflow work in two modes: a) secure - with RPC apis, b) standard - with direct DB calls.
On "localhost" with a local DB the performance overhead for serializing and transporting the messages between two different processes introduced up to 10% overhead. I saw an increase of time to run 500 scans of all our example folders dags going from ~290 to ~320s pretty consistently. Enough of a difference to exclude volatility. I tested it in Docker on both ARM and AMD. It seems that in some cases that can be partially offset by slightly increased parallelism on multi-processing machines run on "bare" metal. I got consistently just 2-3% increase on my linux without docker with the same test harness, but we cannot rely on such configurations, I think focusing on Docker-based installation without any special assumptions on how your networking/localhost is implemented is something we should treat as a baseline.
The same experiment repeated with 50 messages but of much bigger size (300 Callbacks passed per message) have shown 30% performance drop. This is significant, but I believe most of our messages will be much smaller. Also the messages I chose were very special - because in case of Callback we are doing rather sophisticated serialization, because we have to account for Kubernetes objects that are potentially not serializable, so the test involved 300 messages that had to be not only "on the wire" serialized but also some parts of them had to be Airflow-JSON serialized and the whole "executor config" had to be picked/unpickled (for 100 such messages out of 300). Also we have some conditional processing there (there is a union of three types of callbacks that has to be properly deserialized). And those messages could be repeated. This is not happening for most other cases. And we can likely optimize it away in the future. but I wanted to see the "worst" scenario.
For the remote client (still local DB for the internal API server) I got between 30% and 80% slow-down, but this was on my WiFi network. I am sure with a "wired" network, it will be much better, also when a remote DB gets into picture the overall percentage overhead will be much smaller. We knew this would be a "slower" solution but this is the price for someone who wants to have security and isolation.
No tests were performed with SSL, but this will be a modest increase. The "impact" of 10% is IMHO enough to make a decision that we cannot get "Internal but remote-only way" - the path where we will continue using Direct DB access should stay and implementation of POC shows how to do it.
Implementation and maintainability
In the PR you cabn see implementation details and how it will impact our code in general. The proposal is rather elegant and easy to maintain, and likely we can improve it with decorators later, the proposal focuses more on explicitness and showing the mechanisms involved.
Seems that it is easy to implement and it does seem to have some good "easy maintainability" properties.
It boils down to few rules:
- For all the data and "Messages" we sent, we have a nice "Internal API" defined in .proto and protobuf objects generated out of that.
The "API specification" nicely defines the structures we are going to send over the network. It's very standard, it has a really good modern support. For one, we automatically (with pre-commit) generate MyPY type stubs for the generated classes and it makes it super easy to both - implement the mapping and automatically verify its correctness. Mypy nicely catches all kinds of mistakes you can make! Autocomplete for the PROTO-generated classes works like a charm. During the POC implementation nice detection of mistakes provided by MyPy guided implementation to be possible (avoiding a lot of errors) way faster than without it.
- All Airflow objects that we get to send over the wire should get from_protobuf/to_protobuf (or fromJSON/toJSON( methods (this could likely be separated from the object, especially if we decide on supporting more than one transport protocol).
Those are usually very simple (just passing strings/ints/boolean fields to the constructor), and the structures we pass are rather small (see the PR). Also few more complex cases (with more complex serialization) were implemented as part of the POC and it is easy, readable and pretty well integrated into our code. It introduces a little duplication here and there, but those objects change rarely (only when we implement big features like Dynamic Task mapping) and MyPy should guards us against any mishaps there (now that our code is mostly typed).
- We make all "DB Aware" objects also "Serializaation aware". The number of changes in the actual code/logic is rather small and should be easy to maintain.
There are basically two changes to implement for any method in the above inventory:
1) For any of the objects that are used for database operations (in the POC PR this is DagFileProcessor) we need to use a global configuration flag to decide how to run it and in case remote communication is used, we pass the serialized objects via the channel that will be used for communication.
2) The initial method has to be renamed and original method has to be replaced with small snipped to make a direct-DB or Remote call (see above)
Deployment options
Option 1: No isolation (today):
[core]database_access_isolation=False (default)
All components have direct access to the Airflow database as they do right now. No change/impact on Airflow execution.
Option 2: Internal API as part of Web server
This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as passing additional parameter --with-internal-api to the airflow webserver
command.
In this mode Internal API is executed as part of Airflow API embedded into the web server, which allows saving resources in small installations.
Option 3: Internal API as external component
This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as starting additional component with `airflow internal-api`
In this mode Internal API is executed as a standalone component, using the same code as Web Server API but exposing only parts of the endpoints (Internal-API related). This mode allows scaling independently (e.g. on Kubernetes using HPA)
Configuration
Together with Airflow Database API the following configuration setting are introduced:
[core] database_access_isolation
Whether the DBIsolation mode is enabled and untrusted components should use Airflow Database API for accessing Airflow Database
[core] database_api_host
Url for Airflow Database API. Used only if [core]database_access_isolation is True
Dag processor, worker and triggerer
All tests for DagProcessor, Workers (including Operators) and triggers are executed in both modes "DBDirect'' and "DBIsolation ''. The code for Airflow Database API, the DbApiClient and DBSession are extended until all tests pass.
Why is it needed?
This change allows Workers, DagProcessors and Trigerrer to work in untrusted mode - without direct access to Database.
Which users are affected by the change?
Only those that set [core]database_access_isolation=True
How are users affected by the change? (e.g. DB upgrade required?)
What defines this AIP as "done"?
It is possible to start an Airflow Database APi from Airflow CLI and it is easily extensible with new methods. Workers, Dag Processors and Trigerrer can work in DbIsolation mode - without direct access to DB.
28 Comments
Ash Berlin-Taylor
You haven't explained how the code will look like to support both with and without this proposal.
Mateusz Henc
It's just 'self.is_db_isolation' which is always false if the mode is not enabled
Ash Berlin-Taylor
I do not like having the DB API server and the existing Rest API as separate components:
Mateusz Henc
I see your point. We had some discussion about whether to define new API or try to integrate it into existing. While we don't have any strong option, the one problem is that existing is embeded into Web Server - so we may need to separate it to be able to scale it independently etc. But maybe it's the right thing to do.
Jarek Potiuk
I think we all want to achieve the same, but maybe it needs more explanation and a bit more implementation details.
I believe we can (and possibly we should) use the same "connexion_api" endpoints implemented that we have currently and only add a new authentication scheme. Simply run the same endpoints, but plug-in different authentication (based on scheduler-issued tokens and not Base Auth or others). And surely we SHOULD use the already existing API implementation of say "get variable" or "set variable" method. I think there is no doubt about it.
What the "separate component" is about is not to rewrite new APIs for the things we already have but to run and expose (including authentication) differently (and allow to run multiple DB API components as well).
That really boils down to a new `airlfow db-api` or similar component (and we should be able to run multiple of those if needed - for scalability - those components similarly as webserver will be stateless)
We can approach it in two ways:
1) add new APIs that are needed only for the Workers/DagProcessor to the existing OpenAPI specification and simply forbid them from running from other clients
2) create a new "specification" with some overlap which will reuse the same implementation of the "common APIs" and expose the "Worker/DagProcessor" APIs additionally
Both have pros and cons (and maybe with OpenAPI the second one will not even be possible and we will have to implement or use existing ways of filtering available APIs in connexion.
But I think we are virtually talking about the same, and it's just misunderstanding wha "Separate API means"
Jarek Potiuk
Also - maybe that was not clear enough (Mateusz Henc maybe you could also clarify this). That conceptually we are not going to "change" existing implementation of database operations. For example "_run_mini_scheduler_on_child_tasks" which is done as single DB transaction will be executed inside the DB-API component as a single API call - such API call is a good example of calls that would not "fit" in the current "Open" API - because we will never expose it to "end user" (this is one of the reasons why the set of APIs for "Workers/File Processors" should be different than the one available for "users" (with quite an overlap indeed - for the "primitive" calls like get variables, but not for the "internal" logic calls).
John Jackson
Hi Folks,
This is a fantastic initiative that will be a great benefit to all enterprise Airflow users. Would it be possible to extend the comment time until mid-January to allow those who are out for holidays to review and weigh in?
Thanks!
Bolke de Bruin
Great initiative and a good step towards multi-tenant Airflow! I would like to stress that my intention with AIP-1 is to enable full autonomous running of tasks without needing the scheduler at runtime. This means that all details required for the Task to run should be supplied to either the Task when it starts running (e.g. via STDIN/unixsocket) or via the TaskRunner which then communicates with the Task. The Task should only communicate limited states to the TaskRunner (SUCCESS, FAILED) by means of OS exit codes. This enables not only confidentiality but also more availability and integrity. Availability as you would be able to update the scheduler/api without any downtime of the tasks and no required backoff mechanism in the api-client inside the task. Integrity as we now have a trusted process (TaskRunner) that can communicate with the scheduler and an immutable state of the variables and connections (e.g. they can't change at runtime of the task) which can be further supplemented by signing / checksumming the Task to ensure what the scheduler is scheduling is actually running.
In other words, there is no need for TWO WAY communication between a Task and the Scheduler.
I understand that this AIP is written with backwards compatibility in mind and it is a good first step! I do think that the above with the TaskRunner doing the communication on behalf of the Task does not have to come at the cost of backwards compatibility.
Jarek Potiuk
Yep. First step it is. Thanks for kind words Bolke de Bruin
The more I talk to different people the more I see how this is the beginning of the journey rather than even the middle of it. We are barely scratching the surface of what multitenancy will look like in the future.
Niko Oliveira
This would be a great enterprise feature (re MWAA) as it reduces the harm users can possibly do to the meta db from their dags.
For what it's worth I'm in favour of option 3 (Internal API hosted as it's own component). The hoz scalability and isolation from webserver are both desirable, the last thing we need is the WS being bogged down by being inundated with API requests from environments with large/complex dags/tasks.
Sam Wheating
Would it be possible to run the internal API on a different port than the REST API (or at least make the internal API port configurable)? This would make it a lot easier to configure firewalls to block internal API access while permitting REST API access.
Jarek Potiuk
Yes. Of course
Ash Berlin-Taylor
An idea: even if the API server is "in" the webserver, re-writing it to send all DB access via the (in process) internal API could make sense from a code cleanly-ness point of view – essentially that removes DB access from everything but the Scheduler and the Internal API (code)
Jarek Potiuk
Hmm. Interesting idea indeed. I actually quite like it when you mentioned it.
This might simplify the approach quite a bit and it's not very far from the proposal. This might be technically quite a bit different - like no decorator, but explicit (internal API) calls. But it does sound interesting. Let me do some research on some possible API implementations, I am pretty sure some of the possible solutions (GRPC ? ) out there might already have a good solution that you can switch between local and remote APIs - so that we will not have to reinvent the wheel.
Jarek Potiuk
Hm. i focused on the "in-process" not the webserver - so I misunderstood it, but I think It gave me some interesting Idea to explore.
But Yeah - I think we could rewrite Webserver eventually to use internal API (probably not in this AIP - but a separate one)
Omar Al-Safi
Jarek Potiuk How these internal APIs will look like? Will it be similar
api_connexion
but without authentication? Also, we have custom plugins that access the database, I guess these will still be valid since custom plugins are deployed in the webserver?Jarek Potiuk
It's updated not Omar Al-Safi (sorry - I missed your question before) - yeah. Webserver Plugins will continue using the DB directly (and now the doc has all the implementation details and POC). There will be authentication and i am proposing gRPC API.
Philippe Lanoe
Cross posting part of my comment from AIP-56.
Jarek Potiuk
Yes.
* At least in new code we should probably park the logic of DB or API access within the internal API client OR in some kind of abstracted, centralized data access layer
Not sure if I understand what you mean. Example would be useful.
Rewriting whole Airflow to use some kind of centralized data layer would be a huge undertaking. If I understand what you mean - there are a number of places where airflow internally heavily relies on passing and retrieving SQLAlchemy objects. So what we are effectively doing is adding "abstract data layer" where only "untrusted" components will use a subset of methods they use, but the "trusted" ones will continue to use the DB Acccess. This is an approach which seems the most feasible without imposing a huge risk of breaking the behaviour of trusted components.
In fact the proposal with decorators is going to end up with a complete set of methods that will be "untrusted component" data layer. - i.e. methods that are used by untrusted components that expose the internal API. Unless of course I understand it wrongly. This will eventually become a complete set of the "untrusted components abstract data latyer": https://github.com/apache/airflow/blob/main/airflow/api_internal/endpoints/rpc_api_endpoint.py. Do you think about using yet another abstractions internally for Scheduler as well for example? Do you think scheduler should internally use some other Data Access layer? Why would we need it for scheduler-only methods ?
Philippe Lanoe
DB access point: great. Is it already implemented or is there still any backdoor in which the user could get direct access to the DB and skip any authorization check ?
Actually this might just be what Ash Berlin-Taylor suggested, maybe I misunderstood the example:
def
process_file(
self
,
file_path:
str
,
callback_requests:
List
[CallbackRequest],
pickle_dags:
bool
=
False
,
)
-
>
Tuple
[
int
,
int
]:
if
settings.DATABASE_ACCESS_ISOLATION:
return
InternalAPIClient.dagbag_process_file(
file_path
=
file_path, callback_requests
=
callback_requests, pickle_dags
=
pickle_dags
)
return
self
.process_file_db(
file_path
=
file_path, callback_requests
=
callback_requests, pickle_dags
=
pickle_dags
)
If this method is not defined in the airflow core code but only an abstract internal layer (or in the internal API client logic but then the above function should directly call the rpc method instead of
InternalAPIClient.dagbag_process_file
), then it is what I meant and it does not matter if it is implemented through decorator or if/else. Airflow core, untrusted code should only rely on this process file method, without knowing how it is really accessed. I initially understood that such piece of code would be directly part of Airflow core code, which is not the right place in my opinion.Also important point is that I do not expect all existing code to use this pattern right away, and for now the decorator can still be useful until the code is gradually refactored (if ever ), but any newly written code should use this new pattern.
I hope it clarifies the situation, if not, please let me know what is not clear and I will try to do even better
Jarek Potiuk
if
settings.DATABASE_ACCESS_ISOLATION:
return
InternalAPIClient.dagbag_process_file(
file_path
=
file_path, callback_requests
=
callback_requests, pickle_dags
=
pickle_dags
)
return
self
.process_file_db(
file_path
=
file_path, callback_requests
=
callback_requests, pickle_dags
=
pickle_dags
)
This is exactly what
@internal_api
decorator does.Philippe Lanoe
Yes but the decorator is applied within the core code of Airflow (which is OK for existing implementation because of the amount of refactoring work involved, but not for new code), which makes the core code either DB or API aware. Ideally we should have an abstract method which hides the data access logic and keep the core code focused on the "what" and not the "how", like "get_user", "update_role" etc.
Jarek Potiuk
I am confused. What are you asking about? Why ideally? What's the difference. I completely don't understand the rationale behind it, but maybe you can explain on another exaemple.
I think what you are asking for is about something completely diffferent, i.e. changing internally how Airflow architecture, classes and abstraction layers are defined.
But this is quite unrelated to AIP-44. And if you have good idea how to do it, I invite you to writing an AIP about it with some good examples and explanation why you think we need to re-archtitect how Airflow internal classes and abstraction layers are implemented. Changing airflow internal architecture is not a gaal of AIP-44 and will never be. Because we consider it too risky, this is why we chose the more "surgical" approach where we chose to rather use what we have than propose big changes in the architecture. But if you think it is a good idea - feel free (just not in this AIP).
Unless I am mistaken of course.
Philippe Lanoe
Concrete example is this:
https://github.com/apache/airflow/blob/eaaa030ab4dc4d178dab8194fd4ef9450b6b67a5/airflow/dag_processing/processor.py#L416
In the manage SLAs function I see a mix of code logic (SLA handling) and data retrieval from the database to be able to execute this logic. From what I see the decorator is applied on the whole method, maybe because it was simpler for refactoring (which is understandable) or maybe for performance reason (since there are more than one DB call, less data to transfer back and forth)
What I would suggested (not necessarily as part of this AIP because it is too late now) is that any new code should follow a pattern which separate the concerns: the code logic should be executed as little as possible in the internal API (in this case), and only the necessary data are retrieved through the internal API.
For instance
Could be changed to something like (please do not take into account namings)
This method would lie for instance in the task_instance module or elsewhere if dealing with multiple objects is not acceptable within a given model.
It would add more decoupling between code logic and data storage layer making the code lighter and simpler to understand + it gives the freedom to have and change how we want the data access layer code (decorator or not etc.). Such maintenance would be safe because we do not modify code logic but only data retrieval.
I do think that it has a link with AIP-44, while I know it is too late to change existing requirements (and it is not my aim here). Please let me know if you think it is a good idea nevertheless and what would be the next step in this scenario (amendment to this AIP since it does not impact existing implementation, only improvement for future ones? I can also raise a Github issue).
Hope it was clear and concrete enough
Jarek Potiuk
there is absolutely no problem in introducing "get_latest_tis_for_dags" method and decorating it with @internal_api decorator if you think that it will make the code better. Just don't call it the data access layer. Simply refactor the method and decorate it with the decorator (and remove the previous one). this will do exactly what you wanted to get. PRs on that are most welcome. Anyone can contribute to make our code better and such change might be a good idea. I am arguing that having a separate "data_acess_layer" as an abstraction and "name" is not needed. The "internal_api" decorators ARE the data access layer. They are groupped in one place and they are doing what you want to do but the if is inside the decorator.
Philippe Lanoe
My point here is that the decorator should only be applied to (new) data access operations, which is why I called it "data access layer". I do not fully agree on your sentence 'The "internal_api" decorators ARE the data access layer.' for two reasons:
The "how" is maintained completely separately and changing its implementation does not affect code logic areas.
My suggestion is that for any new development (like AIP-56) then we could follow this new pattern which would make the code more readable and maintainable. Existing code refactoring will only be best effort - I am completely fine with that. I would be glad to raise a consensus or vote on this in the mailing thread + make a PR to document the new standard if you think it can be a good idea.
Jarek Potiuk
> DB access point: great. Is it already implemented or is there still any backdoor in which the user could get direct access to the DB and skip any authorization check ?
AIP-44 is being implemented. Once it is implemented, the untrusted components should be able to work without having access to Airlfow metastore DB credentials, which means that if deployed correctly (without the DB credentials configured) they will have no physical possibiity of connecting to the DB because they will not know how to login to it.
Philippe Lanoe
Thanks for the clarification