Status
It's quite difficult to define "Done" criteria for "improve security", however ideas from this AIP and later discussions, have been turned into smaller AIPs:
AIP-43 DAG Processor separationCompleted- AIP-44 Airflow Internal API
AIP-56 Extensible user managementCompleted- AIP-67 Multi-team deployment of Airflow components
They will possibly be followed by more. We will consider this AIP-1 as an "Umbrella" one for those AIPs and we will consider it done when we feel that the security is improved "enough"
Meetings
Meeting #2: Nov 26, 2021
The meeting notes here.
The meeting recording available here.
Meeting #3 Jan 19, 2022
The meeting notes here.
The meeting recording available here
Meeting #4 Feb 16, 2022
The meeting notes here.
The meeting recording available here
Chat transcript here.
Motivation
Airflow runs arbitrary code across different workers. Currently, every task has full access to the Airflow database including connection details like usernames, passwords etc. This makes it quite hard to deploy Airflow in environments that are multi-tenant or semi-multi-tenant. Next to that there is no mechanism in place that ensures that what the scheduler thinks it is scheduling is also the thing that is running at the worker. This creates to additional operational risk of running an out of date task that does something else than expected, aside from the security risk of a malicious task.
Challenges
DAGs can be generated by DAG factories essentially creating a kind of sub-DSL in which DAGs are defined. From Airflow’s point of view this creates a challenge as DAGs therefore are able to pull in arbitrary dependencies.
Envisioned workflow
DAG submission
The DAG directory should be fully managed by Airflow. DAGs should be submitted by authorized users and ownership should be determined by the submitting user. Group ownership determines whether others users can overwrite the DAG. DAGs can be submitted either as single .py file or as a zip containing all dependencies that are required outside of system dependencies.
DAGs are submitted to a REST endpoint. The API server then verifies the submitted DAG and places it in the DAG directory (/var/lib/airflow/dags). It also calculates the SHA512 for the DAG definition and its tasks. It does this on every update.
DAG distribution
In case of distributed workers DAG distribution happens by REST API. The worker checks if a DAG is available In the local cache (/var/lib/cache/airflow/dags) and verifies the SHA512 hash. In case the DAG is unavailable or the hash is different the worker asks the API for the version with the specified hash. It then stores the DAG in the local cache.
Task Launching
Scheduler
When having determined a particular task is ready for launch the scheduler checks what connections are required for the task. It verifies the user that owns the task against the connections. If these match the scheduler serializes the connection details into a metadata blob together. It also recalculates the SHA512 for the task and verifies this with the SHA512 that was calculated at submission time. If these don’t match a security exception will be raised. The SHA512 of the task is combined together with the connection information into metadata. The metadata is serialized and then is sent to the executor together with the task_instance information.
Worker
The worker receives the metadata and calculates the hash of the task and verifies this against the metadata it has received. If these match it continues otherwise it will raise a security exception.
The worker pipes (os.pipe() ) the metadata over a unique file descriptor to the task.
Task
The task parses the metadata and obtains it connection information from the connection information provided in the metadata or for backwards compatibility from the environment.
12 Comments
Dan Davydov
I've been thinking about this more and it is extremely tricky to do with proper security. I think the hardest problem is that DAGs that are being parsed on the Scheduler/Worker have full access to everything the Scheduler/Worker has access to, e.g. they could send arbitrary tasks to the Celery message queue for workers to pick up, have access to all of the secrets on the Scheduler/Workers etc. Using Unix delegation (sudo su) doesn't solve this problem, because the DAGs are python code so when they are eventually passed back to the Scheduler, and the Scheduler calls something like dag.dag_id, a malicious user could have set dag.dag_id to an evil callback.
The solution would be something like preventing the Scheduler from ever parsing DAGs, and instead having some other service that does this and serializes them and sends them back (which is very difficult and might be backwards compatible with e.g. things like python callbacks and jinja templates). The workers themselves would probably have to delegate to some other service to actually run the tasks as well for the same reason (e.g. see dag_id attack above).
Bolke de Bruin
I don’t think it is an unsolvable solution. The worker definitely does not need to same access as the scheduler. In fact it requires very little access. Direct database access is not required as states can be set either by Rest API and/or exit codes. Even airflow.cfg sharing is not required. Just and endpoint and an authentication mechanism is sufficient. Specific details can be shared by the worker to the task by using os.pipe() which creates a secure channel between the two processes.
The scheduler already has process separation and uses an abstraction of a DAG. The remainder is to remove access to any of the resources that the scheduler process maintains before forking and then dropping privileges. Overall this would require the airflow scheduler to start as root, drop privileges to the airflow user, prepare for parsing, drop to airflow_parse user (requires becoming root and then dropping, hence starting the scheduler as root initially). This way the parsed dag does not have access to the scheduler resources and cannot obtain those either.
Dan Davydov
Ah that's a good point that the SimpleDAGs that the Scheduler uses are essentially already serialized DAGs, I'm not 100% sure that the restricted multiprocessing return types guarantee that the SimpleDAG can't be callables but that sounds correct. Having a separate DAG parsing service might not be a horrible idea too to avoid having to run as root, and potentially providing easier ways of scaling DAG parsing and decouple scheduling from parsing. The service can run locally on the scheduler in the default setup.
Worker processes would need to ensure that DAGs couldn't e.g. modify other DAGs (which would be another way to get at the secrets).
In theory DB access shouldn't even be that scary for Workers/Webservers, but it would require that the Airflow setup not relying on Connections/Variables to be secure. There are some DB locks in the worker process that would be a bit of work to preserve if we move to REST.
XCom would also need to be reworked, but that's for the best anyways.
Taylor Edmiston
I'm interested in this particularly for solving the problem of protecting against / guaranteeing the prevention of outdated DAGs from running. I think solving the DAG Distribution and Synchronization Problem would be useful in and of itself, whether in the context of this bigger AIP or by itself.
Tao Feng
any follow up on this topic?
Bolke de Bruin
I have been experimenting with some of it. It needs dedicated focus time it probably helps when we split it into a couple tasks
Tao Feng
thanks Bolke de Bruin, let me know if I could help.
Bolke de Bruin
I'll see if I can do some of that task splitting so it becomes actionable. Maybe you could pick some of them up or help improving the description?
Ash Berlin-Taylor
What is our thought in pickling the task/dag? I know that Pickle has a potential security problem if you don't trust the source, but I don't think that particular caveat applies here? (Any worries about code execution apply equally to the parser or the existing execution model running code anyway)
I've never experimented with pickling in Airflow so I don't know how well it does or doesn't work
Dan Davydov
So I did an audit of the work required, and in general here are some points to note:
is to make the Scheduler do DAG parsing via service calls against a new service rather than the existing IPC solution involving SimpleDagBag. Webservers should also do parsing via service calls, or be locked down such that there is no privileged information on them.
There needs to be a bit more investigation done for webservers because it’s possible webservers need non-serializable state from the parsing process (the one that provides a SimpleDagBag) which could be a blocker (particularly the /rendered endpoint - ti.render_templates() in views.py, but there might be more)
To have proper security (vs some kinds of heuristics which work most of the time) would also require adding some kind of serialization to Airflow, such that you can map service accounts to serialized DAGs (which the Scheduler would then fetch and forward to workers via the executor). This would likely involve an additional table in the DB and a binary store for the serialized DAGs (can be the DB by default but for any Airflow use case of a significant size this binary store should be pluggable via an interface). The table that maps permissions to DAGs might look something like this
some_dag_id → (some_dag_location_of_serialized_object + some_dags_service_account_to_run_as)
The scheduler would then schedule tasks under this service account on workers (which would have access to any secrets such as connection information specific to those service accounts).
There are other problems/considerations for this service that I'm happy to share if people are curious.
Also might be a good idea to consider things like versioning different versions of serialized_objects in this database, and thus solve the consistency story in Airflow at the same time (all tasks in a dagrun will run with the same version of code even if the DAG code is updated midway through).
Valeriys Soloviov
I want to propose some ideas:
create a new operator which will get data via another operator and then process it with my custom code and save the result with another operator. So I am violating the Airflow operators code and get access to the credentials. (We can't separate code and credentials)
solve it on the system's side - write a complex script/SQL and trigger it via DAG's BashOperator (We do not control access in this case. Anyone could run)
create a set of connections with wider permissions and give to DAG owners
So, What I propose? Let's create an equivalent of Unix UID/GID for DAGs and users. Let's grand permissions (e.g. chmod) on AIrflow resources: variables, connections, queues
.
Kaxil Naik
Superseded by AIP-43, AIP-44, AIP-67 & AIP-72