Status

StateCompleted
Discussion Thread

https://lists.apache.org/thread/8hlltm9brdxqyf8jyw1syrfb11hl52k5

Discussion about naming: https://lists.apache.org/thread/br1jfoc8p1wjzk74c09srjgr29spytfy

As result of discussion the AIP-69 was renamed from "Remote Executor" to "Edge Executor"

Lazy Consensus on the name via devlist: https://lists.apache.org/thread/fhcz1q323dllzm0q34bblorsp5h0mk17

Vote Thread

v1: https://lists.apache.org/thread/tyfsrpjn12sz9dw50pbg16dsv6lmj610

v2: https://lists.apache.org/thread/8tdvksrdh6cj04bsc1kwmm8vh6vc4gm8

Vote Result Threadhttps://lists.apache.org/thread/ly82d6xz2fq98xj2c2syxv26q13v0tn9
Progress Tracking (PR/GitHub Project/Issue Label)

Github Label AIP-69: https://github.com/apache/airflow/labels/AIP-69

PoC Draft PR: PoC of AIP-69 - A first task can be called remote

MVP Implementation PR: [Mothership PR] MVP of AIP-69 - A experimental usable version of Edge Executor #40900

PR's split from MVP to:

Date Created

2024-04-15

Version Released2025-04-21 in https://pypi.org/project/apache-airflow-providers-edge3/
AuthorsJens Scheffler 

Motivation

The world is moving to cloud! No... see a lot is still running on-prem! Oh, we need to be hybrid. Damn we have actually a distributed system. We want it in the cloud but have these couple of on-prem servers behind our corporate proxy... how can we add them to the Airflow? SSH Operator is hardly possible... and our IT security will kill us for the idea of incoming connections... Oh, yeah Airflow can be need to be distributed!

In today's (Airflow 2.9) setup most users use a either cloud-native or docker'ized setup in Kubernetes. This allows flexibility and distribution of the components. Or also you can setup the environment on-prem privately. Using CeleryExecutor you could distribute your workers to different locations and link them into a central instance. Most standard features can be set via Helm chart. But... Help! If you have the demand to have a worker far-far away from the main deployment (e.g. crossing security perimeters, need http proxy to connect...), you need to have a good "extension cord". A distributed setup of Celery workers can be complex and as workers need to be able to connect to the Metadata database latency influences the performance. You need to link Redis, the Airflow Meta DB and at least a good GIT Sync to execute code on a remote distance. Also UDP is needed if you want to have statsd reporting for metrics.

This AIP proposes to add a "Edge Executor" (via provider package) allowing to create a distributed worker setup. It considers the planned extensions for AIP-61 Hybrid Execution which would allow selective distribution depending on task. It also plans to leverage and build on top of AIP-44 Airflow Internal API and in a future development would merge and join forces with AIP-72 Task SDK (link to be added).

Considerations

To implement the capability for a distributed orchestration crossing network boundaries the following options were considered:

  • CeleryExecutor over a VPN or other tunnel - Celery needs a lot of protocols being connected (TCP: Postgres, Redis / UDP: StatsD / HTTP: GitSync / Log Upload / Remote Log Access for Web UI) and building VPNs across different security zones in enterprise environments is mostly not accepted. Drilling firewall holes and wrapping TCP connections into a SSL tunnel is a configuration nightmare. Setup is pretty manual and the remote site has core access to all secrets and backends.
  • Distributed LocalExecutor instances - Need core DB access, due to latency all queries will be slow. Due to the nature of the LocalExecutor you could run tasks remotely but you can not route the tasks to the "right" location (compared to Celery where you have different queues)
  • SSHOperator or MS Power Shell Remote (PSRP) - Is a central to remote connection which requires a firewall connection being opened from cloud to remote location. Usually in enterprise environments inbound connections are not permitted and it is not possible to cross HTTP proxies. Might be usable within a corporate environment but not across security perimeters.
  • AIP-44 Airflow Internal API with edge worker: As the name stated it was made as internal API - mainly with the focus to de couple workers from the database. Technically it makes it possible to connect a worker via HTTP - but the job to the worker needs to be routed. Task assignment in this model is (currently) not in scope of the AIP-44 Airflow Internal API. It would still need a broker like Celery.
  • AIP-72 Task SDK: Is work-in-progress as of 06-2024. But compared to AIP-44 Airflow Internal API it defines a clear interface for task execution. But distribution and routing of tasks is also not in scope.
  • Distributed Multi-Site Airflow Setup: Install multiple instances of Airflow and connect separate islands via Kafka. Technically this would be a workaround but in such case workflows are not possible to be modelled end-2-end and must be broken in pieces. An overview about state of a business flow/transaction is hard to achieve.

Therefore this AIP focuses on the remote distribution of tasks to edge sites. Proposal is to start the implementation of this AIP based on the AIP-44 Airflow Internal API (as a tactical solution to offer a first version in Airflow 2.x line) and once available migrate to a clean AIP-72 Task API (potentially Airflow 3.x line). Purpose of this AIP is to make minimal changes to core structures allowing the task routing as "Edge Executor/Worker" to be implemented in a provider package.

Together with AIP-61 Hybrid Execution a full setup will allow to use CeleryExecutor for central task distribution (e.g. in data center or cloud) and route selective tasks to remote via Edge Executor. This AIP should not build any extra around this.

What change do you propose to make?

It is proposed to implement an Edge Executor as additional executor option for Airflow in order to distribute tasks to remote machines. A remote machine receiving a job to execute would be a "Edge Worker".

On a high level this would be similar to Github self-hosted runners, GitLab (custom) runner or Azure DevOps Self-hosted Agents - a self-hosted deployment(*) which connects to a central pipeline to execute jobs. Compared to current distribution and connectivity the core difference would be that a Edge Executor always connects to the central setup and pulls jobs (compared to Celery: push), assumption is that a Edge Worker can not be directly (TCP/UDP) connected and might sit behind corporate proxies and/or firewalls and connectivity is limited to HTTP(s) only.

*) (info) Note: The plan was to make a "as thin as possible" deployment but as of the analysis of current dependency chain it is assumed that an Airflow 2.x Edge Executor would need to carry all core airflow package dependencies. Once AIP-72 is available, dependencies to core can be removed to realize a thin deployment.

Requirements that a Edge Executor must provide (Criteria for first release/MVP)

  • The Edge Executor is an optional dependency, delivered as provider package. It consists of a "Executor" Airflow core deployment with some administrative endpoints, integration to Scheduler and authentication/authorization mechanisms. An additional package which is deployed on the "Edge Worker" is included which should be as-lean-as-possible (Airflow 2.x: depends on all core packages, future target: just some Python packages and a file tree, no database, docker or other infrastructure requirements).
  • All connectivity from the Edge Worker to the central Airflow setup must be made via HTTPS. Authentication must be made either via current Airflow credentials (as provided by security manager component) or via a JWT token mechanism (if implemented and available). The Edge Worker must be able to use a Proxy to connect to the central Airflow setup for enterprise environments where no "free internet" is available. No connection from Airflow central site to Remote is assumed (pull/request only from worker, no task push, potentially via WebSockets or long polling)
  • A Edge Worker must be able to sustain a temporary connection loss or interruption. It should automatically be able to re-connect. A temporary connection loss shall not terminate a job execution.
  • A small dashboard must be provided such that an administrator can view
    • The connectivity state of each Edge Worker (Similar like Flower in Celery) and if free/busy, which job/task they are on
    • Needs to provide a short overview about state of Queues (amount of jobs per queue and age)
  • Similar like Celery Queues each Edge Worker should be able to be configured to execute jobs from one or multiple queues. This would allow distribution to different Edge endpoints.
  • Logs of task execution (stdout/stderr) must be uploaded finally and during a job execution such that the progress can be seen on the central web UI. Local logs must be made as copy for troubleshooting locally (including log rotation/cleanup)
  • A Edge Worker needs to send regular heart beats.
  • If a task is terminated from the central side (Timeout, "Set as success/fail" in UI), Edge site must also terminate the job
  • At least 100 Edge Worker nodes should be supported.
  • Documentation for setup/start of the Edge Worker
  • From workflow perspective a task directed to Edge Worker should be able to be modeled like every other task, can be templated, receive parameters and XCom and will be able to push XCom.
    • There might be restrictions assumed, potentially due to lag of DB/internal APIs not all Operators might be suitable - details need to be elaborated during the implementation
  • Support for local development via Breeze is to be included of course

Requirements that a Edge Executor should provide (can be incrementally be added after first release)

  • As depending on a general rework for AIP-72 Task SDK Secrets and other needed internal endpoints (e.g. DAG/Task Context)  are provided from the central instance "on demand" with a "need to know" as Connection are defined for the task. The Edge Worker should not have access to all other credentials. The edge setup should only be trusted to the scope of information and code needed to execute the job. Access to a Edge Worker must not generate a new attack vector to the system. In scope of this AIP this means needed rework to adjust to new Task SDK.
  • DAGs python task code will be pushed to the Edge Worker (no need for a separate GIT sync should be needed per default) - this might impose some limitations which code can be executed similar like the PythonVirtualEnvOperator
  • Authentication must be made either via current Airflow credentials (as provided by security manager component) or preferably with an API Token. An individual API Token per Edge Worker is recommended for production and security
  • Enrollment might be able to be automated via IT distribution tooling e.g. Ansible
  • The small dashboard is extended by
    • Option to pause distribution to a worker instance for planned maintenance
    • (Similar like celery with Flower:) Allow changing of queues at runtime (w/o restart of worker)
    • Note: All central options on the dashboard must be available via API as well
  • A Edge Worker is integrated into standard Airflow telemetry/metrics back-ends.
  • A Edge Worker should be able to consume multiple jobs in parallel (similar to a Celery Worker)
  • The deployment of the Edge Worker should be a thin install with minimum Python package dependencies - assumed that a Python runtime is available it can be deployed on Linux/Windows/MacOS. We would assume Python version and package version is matching to the central setup.
  • At least 1000 Edge Worker nodes are supported
  • Documentation for setup includes help for deployment of the Edge Worker (e.g. including systemd setup scripts, extensions on helm chart)

Proposed Feature Enablement and Differences

FeatureMVP Release in Airflow 2.x lineFeature Increment in Airflow 2.x lineAirflow 3.xNotes
Adjustments in Airflow (core)(tick)(question) (small patches only, no structural changes)(plus) via AIP-72Target is to integrate with minimal intervention to existing Airflow 2.x. Core function of Edge will be in a Provider package
Provider Package "Edge"(tick) MVP Scope(tick) Increments of features and stability(plus) Probably: Rework on new internal Task SDK/AIP-72Target: Have a basic package soon and learn over time. Besides other changes in Airflow 3.x not needed to expose breaking changes to users.
HTTP(s) connection(tick)(tick)(plus)

with sustainability of connection loss/retry - no persistent connection needed. In future versions the connectivity might be improved from simple polling (MVP) to WebSockets to push events via a long standing HTTP connection.

Note that due to the interface which is used in AIP-44 based on internal DB de-coupling a lot of round-trips per task on HTTP are expected. This will -depending on the latency - slow down the task execution. It is assumed that via AIP-72 the round-trips are optimized and task execution latency is improving. But this would be mainly a benefit of the underlying Task SDK changes.

Status Dashboard(tick)(plus)(plus)(plus) (also via API)Might improve over time and will made machine readable via API as well.
Log upload from Edge site(tick)(tick)(tick)No HTTP pull needed to remote site, logs already partly available while executing (no need to wait until job completes and S3/Blob upload is made via log handler)
Heart beats(tick)(tick)(tick)
Scalability100 Workers100+ Workers1000 WorkersFor MVP the API endpoint might just be made available via webserver plugin, like AIP-44 a standalone API endpoint process / engine might be added to de-couple workers from web UI and allow easier horizontal scaling in later versions.
Development Support via Breeze(tick)(plus)(plus)
"Need to know" based secret / variable distribution(error)(error)(tick) via AIP-72Once AIP-72 is available the Edge Executor can/will be re-modeled to the Task SDK. Until then the remote execution needs to rely on AIP-44 features and GIT Sync for DAG code. Means that potentially the full internal task and secret scope is reachable as long as not limited by a multi-team restriction feature.
push only relevant secrets and DAG code(error)(error)(tick) via AIP-72As above
Thin deployment with minimal PIP dependencies(error)(error)(tick) via AIP-72In MVP will need full apache-airflow and edge provider at least incl. pydantic and a lot of standard dependencies from airflow base package.
Edge Worker Authentication(star) Based on User/Pasword on API(tick) JWT or ID Token(tick)
Support / Documentation for automated rollout (e.g. Ansible, Systemd scripts, Compose Template)(error)(tick)(tick)
Edge Worker Telemetry / Metrics(error)(question)(tick)details tbd. Assume can be delivered in increments.
Task concurrency(tick) (Already in PoC (big grin) Yay!)(tick) Concurrency(tick)
Windows support(question) first test was made with PythonOperator. And Yay was working (big grin)(question) to be checked(tick)

Due to the dependencies needed from Airflow core probably a deployment on Windows will not be possible in first version. Gaps need to be checked. With AIP-72 the target would be that many dependencies can be cut and limitations for Windows deployment can be removed.

But besides a simple documentation will need to be marked experimental unless CI is extended to have test coverage and Windows runners. This will be the main work for Windows support.

Technical Design

The following proposed implementation architecture is made. It might change in detail during the implementation

From the architecture the following components need to be implemented in scope of this AIP:

  • Adjustment in Airflow core
    • Extension to Internal API to proxy Internal API calls via a different endpoint (external facing, needs HTTPS compared to internal API which is fixed URL and HTTP only - needs to be able to be exposed with external security controls and protection)
    • Extensions in serialization allowing to register serialized classes from provider package
    • Potential adjustments in the log viewer in the web server (to be checked during implementation)
    • Support in Breeze for development
    • References in documentation, provider registry
  • "Edge" Provider package
    • DB model extensions
    • EdgeExecutor implementation (to be loaded in Scheduler)
    • UI Plugin with
      • API Endpoints for the Edge Worker
        • Building on top of the AIP-44 logic with "code first" RPC style calls. Leveraging AIP-44 logic but proxying to another endpoint with options for better protection
        • Additional endpoints on top of AIP-44 for queue to worker communication, job distribution and heart beats, log uploads.
      • UI Plugin to show status
    • Edge Worker (with the airflow CLI, started via airflow edge worker...)

What problem does it solve?

  • If a worker in current Airflow is to be maintained outside of the core setup it is hard to (manually) integrate e.g. via Celery. A Edge Executor would allow easy integration.
  • Currently due to Python limitations it is not possible to distribute jobs to Windows machines except is a worker setup is made via WSL. A Edge Executor could strip-off a lot of Airflow core dependencies and could be made thin and with less dependencies to be deploy-able on Windows as well. With just a few MB of Python dependencies a remote machine could be looped into the setup
  • A lot of internal APIs and DB endpoint must be exposed to operate a Edge Worker today. This opens a door for security problems and for example a VPN or connection encryption is need to be made from a remote location to a central site. A Edge Executor could be designed to make it easy with less fear of disclosing secrets on a remotely or even badly managed machine with danger of secret exposure.
  • Long distance connection typically cross firewall boundaries. In enterprise networks this also is often restricted by need of Proxy connection or On-premise setups also need HTTP proxies to reach a cloud instance. A Edge Executor can be made in a way that only HTTP(s) is needed compared to communication protocols like today.
  • Worker status and logs are made visible in UI by calling the worker via HTTP. In a distributed setup this might be a challenge (security and firewalls) - The Edge Executor could send logs and bridge this gap

Why is it needed?

It is hard with Airflow (e.g. Celery) to deploy and host a distributed worker system in a different security perimeter. Only workaround in most cases is something lie a SSHOperator (which requires direct inbound access to Remote machine) or an exposed HTTP REST API which is called by Airflow - which requires also direct network access and a lot of custom code. Or a message bus which would bridge multiple locations (e.g. distributed Kafka Queue) but then the end-2-end workflow must be split into multiple sub-flows.

Are there any downsides to this change?

A new provider package is needed. If rejected by community it could be also an external provider outside of the core Airflow. But at least small changes on core need to be made.

Depending on technical details the possibilities for task execution might be limited assuming that not a full Airflow core codebase will be available. Limitations from AIP-44 or Task SDK will apply.

The design must cater for security problems. This opens the risk for potential vulnerabilities and attack vectors. Concepts need careful review.

Which users are affected by the change?

None. Would be an optional package.

How are users affected by the change? (e.g. DB upgrade required?)

No user is affected. Database tables need a small extension to keep the job status.

Other considerations?

As there is a parallel discussion about Airflow 3 ongoing - This AIP might influence design towards Airflow 3 plannings. The target is not to directly compete with CeleryExecutor but it might act as a point to discuss for the future. Edge Executor should target for "pull only" Edge Worker/distributed setups whereas Celery brings a lot of maturity and options to have high frequency and low latency task distributions (push) possible.

This AIP is planned to be added as "non breaking change" as an extension to Airflow 2 branch. It would be marked as experimental after MVP until critical demanded features are included allowing to change (Edge Executor internal) interfaces flexibly. As of adding it to Airflow 2 branch it allows us to learn and rework and include outcome of the implementation also into a future Airflow 3 version. It would be explicitly assumed that a rework would be needed if Airflow 3 internal structures change. (But it would also be very value-able to have a learning curve before Airflow 3 design).

Implementation Strategy:

  • Build on-top of AIP-44 and AIP-72 - focus on this AIP on the task distribution / execution and routing
  • If gaps are showing up in AIP-44, contribute there
  • Minimize changes in Core and make all main logic in the provider package

What defines this AIP as "done"?

  • (tick) MVP: If not all tasks can be made running on a reduced Edge Worker, at least a Bash/Shell Operator, Python Operator and Docker Operator should be able to be used.
  • (tick) A Airflow deployment manager can deploy the package to enable and configure the Edge Executor
  • (tick) A Edge Executor (agent) can be joined to a Airflow setup/environment
  • (tick) A job/task can be distributed via the executor attribute to a Edge Worker. The queue field might be used as label to distribute to a set of remote sites.
  • (tick) Documentation is available

(info) Note: marking the AIP-69 as Completed with the achievement of MVP. Nevertheless as agreed with community a version 1.0.0 will only be released after rework on AIP-72. This will be tracked as continuous maintenance while Airflow 3 is further developed.

11 Comments

  1. Jens Scheffler great work on the AIP, I am in good support of the ideas posted here.

    I have some comments from my first review.


  2. I'm still not quite sure what changes this AIP is actually proposing yet? I think I understand with the core ask, but I'm just not sure how we get there?

    1. Thanks for the feedback. At the moment I have no fixed solution but also wanted to check for feedback on requirements first.

      Do you expect a technical design for a next step feedback or are specific points unclear?

      1. Gotcha gotcha. Not sure. I do have one idea that relates to this about changing the the task execution "API" (I'm doing a bad job of describing it in text, but I think it will help with this.)

        1. Added some more technical details as elaborated the last weeks - now possible to review together with your AIP-72 DRAFT and see how this could fit together (cooperative not competetive!)

  3. Really appreciate your second draft Jens Scheffler
    You have added a significant amount of detail here and the table comparing options is really useful in having a detailed discussion. 
    Ash Berlin-Taylor and I recognize that we still have a few details to document as part of AIP-72, specifically including configuration options and which parts auth will (or will not) be in scope as currently envisioned, so the framework here is very helpful. We will also add the corresponding details in AIP-72 in the next few days. 

  4. AIP-61 Hybrid Execution will allow to use CeleryExecutor for central task distribution (e.g. in data center or cloud) and route selective tasks to remote via Remote Executor
    Presumably the reference to AIP-61 here is to specify within the DAG code, which tasks should be run with a local Celery executor vs. the Remote Executor. Am I understanding this correctly? 
    1. Yeah it was hard for me to understand this sentence as well but I came up with the same conclusion

    2. Sorry, yeah if not clear: AIP-61 would be a good ground work (if also finalized in AF2.1 which we all hope) such that you can selectively send tasks to remote and keep others in the central cluster. I'd assume in a central location you would prefer CeleryExecutor or KubernetesExecutor or both and just use RemoteExecutor for the tasks that need to be distributed.

  5. Is there a constraint between the provider package version used by the remote executor and the one used by the remote worker? In other words, if the code in the main deployment (where the scheduler is) use the remote provider package version X and the remote worker uses the remote provider package version Y, what happens? Should it be the same one? Or it will follow the semver logic and any version in 1.x (for example) will be compatible with any version in 1.x? If the version going to be sent over the wire to compare the two versions?

    1. Good catch. Not exactly nailed down but a semver check makes sense to accept patch differences but to validate that the feature version of the provider matches.

      I have it on my todo list in https://github.com/apache/airflow/pull/40224/files#diff-e849d78a2ebb4fe5e6c9ea26bd4f4aaa21439dafd8fa7e4984755d2697e1c5a2R53 to validate remote version