Skip to end of metadata
Go to start of metadata

Status

StateDraft
Discussion Thread

JIRA

AIRFLOW-6440 - Getting issue details... STATUS

Motivation

For the past year my company has been leveraging an AWS Fargate Executor (since Airflow 1.10.2). We really want to give back to the Airflow community for building something so wonderful. We're also open to new ideas and thoughts for how to improve the executor.

Considerations

If I had to grossly oversimplify Fargate in a sentence, I would describe it as "Serverless Docker"; "AWS Proprietary Kubernetes" is just not as catchy. The basic premise is that containers are spun up onto EC2 instances, without having to worry about the provisioning and managing of servers.

What change do you propose to make?

We propose the creation of a new Airflow Executor, called the FargateExecutor, that runs tasks asynchronously on AWS Fargate. The Airflow Scheduler comes up with a command that needs to be executed in some shell. A Docker container parameterized with the command is passed in as an ARG, and AWS Fargate provisions a new instance with . The container then completes or fails the job, causing the container to die along with the Fargate instance. The executor is responsible for keeping track what happened to the task with an airflow task id and AWS ARN number, and based off of the instance exit code we either say that the task succeeded or failed.

See the 'Sample Code' section below.

What problem does it solve?

To revisit the main benefit of using a serverless executor (Fargate or Kubernetes), let's compare it to the Celery Executor.

  • With Celery, there is no predefined concept of auto-scaling. Therefore the number of worker servers one must constantly provision, pay for, and maintain is a static number. Due to the sporadic nature of batch-processing, most of the time most of these servers are not in use. However, during peak hours, these servers become overloaded.
  • Servers require up-keep and maintenance. For example, just one cpu-bound or memory-bound Airflow Task could overload the resources of a server and starve out the celery or scheduler thread; thus causing the entire server to go down.
  • Lastly, there's also the consideration of server-patching and instance configurations.

Why is it needed?

In my personal opinion, the concept of server-less auto-scaling and batch-processing go hand-in-hand. You get servers when you need, based on the tasks that you're scheduling. By borrowing from Amazon's superfluous processing power, there is no more waiting for tasks to clear the queue. Instead, the concept of Airflow "landing-times" goes down to a constant value of server bootup time; which is often less than 2 minutes. No more provisioning of servers, patching, maintenance; you let the army of AWS Dev-Ops engineers handle that for you. If one task exceeds its memory limits causing a hard fault, AWS APIs will tell us and even denote the exit-code.

As far as the approach of one-task per instance, you could do the math and end up with either a better or worse price-tag depending on the extremities between the peeks and valleys of your jobs. Less maintenance is also less cost. In general, jobs clustered together in a short time period benefit greatly from this approach, and evenly distributed jobs will cause the user to likely pay more. While most batch jobs in my experience adhere to the former standard, I wouldn't say that this is a major consideration for this executor at all. One way to think about it is that you only pay for the tasks that you're running...kinda. Another way to think about it is that you're pooling together your money with Amazon's fleet of engineers, technicians, and servers.

Are there any downsides to this change?

It's important to also check out AWS maximum container limitations. The most annoying one defines the maximum number of concurrent containers per task definition to be 10. There are programmatic solutions to this problem. However, these default limitations can be easily increased with an AWS Support Request call. In my current company, AWS has overwritten the configuration for concurrent containers to be greater than 100.
https://docs.aws.amazon.com/general/latest/gr/ecs-service.html#limits_ecs

Which users are affected by the change?

All users would have a new executor. This would specifically appeal to AWS users. However, there are a myriad of other executors to choose from, so there's always an opt-out option.

How are users affected by the change? (e.g. DB upgrade required?)

Boto3 is already a dependency for a few operators, and even optional S3 logging. Now Boto3 would become a dependency for an executor.

No other migrations or extraneous libraries required.

Other considerations?

  • Boto3 as a dependency.
  • Properly credentialed Boto3 configuration. See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html for credential management.
  • Fargate instances must have correctly provisioned IAM roles based off of the three Boto3 calls listed above.
  • I would also say documentation on how to creating an ECS Cluster would be a requirement.
  • The maintenance of this executor is honestly minimal since it mostly relies on Boto3 APIs, and it only overrides abstract methods from the BaseExecutor class. I've only ever had to make modifications during Airflow upgrades.

What defines this AIP as "done"?

The addition of a well-tested, well-documented AWS Fargate Executor merged into master.

Sample Code

I know this isn't exactly following protocol, but I already have some code prepared. It's 100% functional, but there's still some work to be done (for example, python 3.5 compatibility). To be clear I'm not married to anything, and would happily write or rewrite based off of community feedback.

The three major Boto3 API calls are:

  • The execute_async() function calls boto3's run_task() function.
  • The sync() function calls boto3's describe_tasks() function.
  • The terminate() function calls boto3's stop_task() function.

To accomplish this we create a FargateExecutor under the "airflow.executors" module. This class will extend from BaseExecutor and override 5 methods: start(), sync(), execute_async(), end(), and terminate(). Internally, the FargateExecutor uses boto3 for monitoring and deployment purposes.

Git: https://github.com/aelzeiny/airflow/pull/1/files