Status

StateAccepted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/g2c83wsrx1fy04v7kjzh65smykpf6c26
Vote Result Threadhttps://lists.apache.org/thread/b5w8d42w8z0yyl37x2sk8r6wb7kygjpy
Progress Tracking (PR/GitHub Project/Issue Label)
GitHub Project
Umbrella AIPAIP-73 Expanded Data Awareness
Date Created

2024-06-26

Version Released
Authors

This AIP is part of AIP-73, which aims to expand data awareness within Airflow.


Motivation

Partitions are a critical enhancement to Airflow's data processing capabilities, designed to address some limitations in the current task-centric approach. The primary motivations for introducing partitions are as follows:

  • Granular Data Management: Currently, Airflow's data intervals are tied directly to task schedules, which limits the flexibility in handling datasets that do not fit neatly into these intervals. By introducing partitions, we can manage data at a more granular level, allowing for efficient processing of specific slices of data. This is particularly useful for large datasets that need to be processed incrementally or updated periodically.
  • Enhanced Performance: Processing large datasets in their entirety can be resource-intensive and time-consuming. Partitions enable Airflow to handle smaller chunks of data independently, improving performance and reducing processing times. This is essential for maintaining the efficiency and scalability of data flows, especially as data volumes continue to grow.
  • Improved Data Lineage and Impact Analysis: With partitions, it becomes easier to track the flow of data through various stages of the workflow. This visibility allows for more accurate impact analysis when changes are made, ensuring that any modifications to upstream data can be traced and addressed effectively. This capability is crucial for maintaining data quality and reliability. 
  • Facilitating Incremental Processing: Many modern data processing strategies rely on incremental updates, where only new or changed data is processed. Partitions support this approach by allowing for the independent processing of data slices. This facilitates more efficient and intelligent backfilling, enabling data flows to quickly catch up on missed data without reprocessing the entire dataset.
  • Flexibility in Data Organization: Different user-cases may require different methods of data partitioning, such as time-based or segment-based partitions. By providing a flexible partitioning system, Airflow can better accommodate a variety of data organization strategies, supporting more complex and nuanced data flows.

Rationale

The idea behind partitions is to provide a way to model and organize data that is closely related, but still distinct, decoupling it from the task. For example, a daily-partitioned table for sales orders could be seen as a collection of partitions that correspond to a sales order data asset corresponding to sales orders made on each day. These partitions are computed using the same code and derived from the same upstream sources. They can be processed and visualized in bulk and individually or as subsets.

Consider there’s an asset populated by a function. If the asset is not partitioned, the function can be thought to logically replace the entire asset every time it writes.

Although history is always kept, the previous execution result is logically overwritten when a new run happens.

If the asset is partitioned, on the other hand, every execution of the function would receive a key to identify a section of the asset it should update. An asset partitioned by date, for example, can have the function receive the date as the partition key. Execution then looks like this:

In this simple form, the partition key is actually just the logical date we already have in Airflow. However, partitions are fundamentally a property of the data, not the task or DAG. Airflow’s current implementation has constraints:

  1. The data interval or partition is automatically calculated based on the schedule and requires all data updated by different tasks within the DAG to be partitioned in the same way.
  2. It requires for partitions to be one-to-one with task runs.
  3. The partition key must be a date on a pre-defined schedule. This is fine for many ETL use cases, but problematic in edge or non-ETL cases.

A partition describes how an asset is partially generated whenever its function executes. While the function itself does not strictly require this information (the asset’s function body can do whatever it wants and is the canonical source of truth for what gets written), there are clear benefits to using partitions:

  1. Clarity in Asset Generation: Partitions provide other parts of the system with information on how an asset is generated without analyzing data outputs. For example,  if the raw sales order asset uses hourly partitions and the daily sales order asset uses the raw sales order asset, we know that 24 hourly partitions from the raw sales order asset correspond to 1 daily partition in the daily sales order asset. This clarity allows for a more granular highlight of data flow in the UI and showcases how issues impact downstream assets.
  2. Parallel Processing: Multiple partitions can be processed in parallel. For example, if partitions represent cloud regions, we can launch individual DAG runs to process data for each cloud region more quickly. Additionally, if upstream data has been processed for some regions but not others, processing can start for the regions that are ready.
  3. Ad-hoc Range Processing: A single task can generate a range of partitions as needed, and the generated partition information can be depended on to further generate more data from the partition of data.

Partitions in Airflow are a logical concept and don’t necessarily need to correspond to storage-level partitions; they are only a way for you to mentally model how the data look like and behave. Some storage systems like Hive have an explicit notion of partitions with table partitions corresponding to subdirectories, while other systems like Snowflake abstract storage away entirely. Even without a direct link, the concept is incredibly useful for data orchestration. For instance, when overwriting data in a Snowflake table for a specific date range, you delete all the rows for that date and insert a new set of rows—essentially working with a logical partition. Technically anything can be thought of being partitioned (with a never-coming second partition), and anything can be considered non-partitioned (each materialization simply contains all previous data again). Even for one use case, different people can prefer to think of the same data differently; it’s all about what you want to model the situation, and how Airflow can best help you achieve what you want.

Specification

Partitions can be defined statically or dynamically. Static partitions can also be combined to create more complex partitioning. Partition information can be accessed inside the function decorated by @asset.

A sliced partition of an asset is identified by a value of any immutable (hashable) types that can be serialized by Airflow. This includes int, float, str, most stdlib types (datetime is a common choice for time-based partitions), and an immutable collection type containing supported types (e.g. tuple). The partition’s identity (within the same asset) relies on the value’s implementation of __hash__ and __eq__. One notable exclusion here is dict (being mutable and non-hashable); instead, a special PartitionKey class is introduced to cover cases where a rich composite key is preferred, see below.

Two categories of partitions will be introduced: time-based, and segment-based. A time-based partition is specified an interval, e.g. daily, hourly, a cron expression, or timedelta. A segment-based partition is specified by a static list.

Time-based Partitions

These are similar to logical dates (more accurately, data intervals) today, and are meant to represent time or date ranges. Like today, they can automatically be derived from the schedule or can be manually set.

@asset(..., schedule="@hourly", partition=PartitionByInterval("@hourly"))
def hourly_data():
   ...

The concept is similar to how Airflow 2 recommends using the data interval, but they are fundamentally different since the partition boundaries are entirely decoupled from the schedule. Consider this example:

@asset(..., schedule="@daily", partition=PartitionByInterval("@hourly"))
def hourly_data_generated_daily():
    ...

The asset is materialized once per day at midnight, but every materialization would trigger 24 runs, each covering one hour of the previous day.

Conversely, if an hourly-scheduled asset has a daily partition, it would materialize once per hour, but for any given day, the first 23 materialization runs would not actually happen, and only the last one being actually executed to cover the past entire day. This is more useful in cases when an asset is scheduled against another (see Partition Dependency section below for some more details on this). For example:

@asset(..., schedule="@hourly", partition=PartitionByInterval("@hourly"))
def hourly_data():
    ...

@asset(..., schedule=hourly_data, partition=PartitionByInterval("@daily"))
def language_model(hourly_data):
# Retrain language model based on the past day's data...

The downstream language_model is scheduled against the upstream hourly_data, but does not want to be materialized as often (perhaps due to the materialization being expensive). This allows the downstream to still “follow” the upstream’s schedule, instead of having an independent one and worry about language_model being accidentally run too soon before the last hourly_data finishes—a common problem with traditional DAGs that necessitates a sensor in the beginning of the downstream.

I have not decided how best to implement the “skipping” part of this. The first 23 upstream events still need to be handled in some way. This can be done by still creating 23 runs but not actually running the task, or we can choose to not create the runs at all. I feel we should still do something (doing nothing may appear like Airflow is having hiccups to the user), but what exactly is undecided yet.

Segment-based Partitions

These are meant to represent other types of logical groupings, like teams or cloud regions. For example, if an asset is generated every day to evaluate the spend of multiple data warehouses, each data warehouse can be represented as a partition.

@asset( 
    ..., 
    schedule="@daily",
    partition=PartitionBySequence(["marketing-dwh", "engineering-dwh"]),
)
def dwh_daily_cloud_spend():
   ...

Combined Partitions

You will also be able to combine time-based and segment-based partitions.

@asset(
    schedule=dwh_daily_cloud_spend, 
    partition=PartitionByProduct(
        [PartitionByInterval("@hourly"), PartitionBySequence(["marketing-dwh", "engineering-dwh"])],
    ),
)
def dwh_spend_analysis():
   ...

After the upstream asset is generated each day, 48 (24 hours x 2 data warehouses) runs are triggered. To simplify logic, a combined partition definition can only contain at most one time-based partition definition. We’ll explore more complex combinations if there are concrete real-world use cases after this feature is fully implemented and rolled out.

Runtime-assigned Partitions

All cases discussed up until now know the partition key up-front, and therefore can provide the key to a particular function run. Sometimes, however, partitions are not known until when the function is actually run to process the incoming data. For example, to predict energy consumption, an energy company trains an ML model per customer. Customers are frequently added and removed, so it is simpler to get an accurate list of customers at runtime.

active_customers = PartitionAtRuntime(name="active_customers")

@asset(schedule="@hourly", partition=active_customers)
def energy_consumption_model():
    """get customer list and train model"""
    customers = ... # get customer list
    for customer in customers:
        train_model = ...
      active_customers.add_partition(key=customer)

On its own, this is just extra metadata that can be shown in the web UI. The partition, however, can also be used by multiple assets, and the created partitions would reflect in other assets. This is useful in a downstream asset like this:

@asset(schedule=energy_consumption_model, partition=active_customers)
def energy_prediction():
  # This is run once when a partition is added on active_customers,
# to materialize the new partition.

See also Partition Dependency.

Runtime partitioning cannot be combined with statically defined partitioning with PartitionByProduct.

Rich Partition Key

Internally, each partition is identified by its partition key. Although a primitive value (like a date, or a string) can satisfy basic use cases, real-world scenarios most likely require richer information, such as a time span (e.g. 2pm yesterday to 2pm today) or a collection of values (e.g. x, y, and z in a three-dimension coordinate system). We also need to consider how we can disply these in the web UI. The partition key, either generated automatically from a definition or given directly by the user (with either PartitionBySequence or added at runtime), will always be wrapped in a PartitionKey class. The user can create custom subclasses to contain custom fields, like this:

class Position(PartitionKey):
x: float
y: float
z: float

# Optional; Airflow generates a repr similar to dataclasses.
def __str__(self) -> str:
return f"({self.x}, {self.y}, {self.z})"

The PartitionKey type should have a custom metaclass to stop users from being able to supply a custom __init__ or __new__ to inject arbitrary code into protected Airflow processes.

In simple cases (e.g. user supplying plain strings to PartitionByProduct), Airflow should automatically wrap the keys in special classes, and unwrap them automatically when possible. PartitionByInterval should produce Interval instances as partition keys that specify the start and end of the partition, with a custom __str__ that represents the interval by the start date.

Accessing Partition Information

Similar to logical dates and data intervals, partitions are parameterizable and will be available via the task context.

@asset(
...,
partition=PartitionBySequence(["marketing-dwh", "engineering-dwh"]),
)
def dwh_daily_cloud_spend():
    context = get_current_context()
   for partition in context["partition"].iter_past_partitions():
        # do something

The partition value in context is a rich object that exposes various partition information, including the original definition, the current and past partitions, etc. This has not been fully hashed out yet.

Partition Dependency

When an asset is scheduled against an upstream, and the two assets have the same partitioning logic, each partition of the downstream depends on the corresponding partition of the upstream.

@asset(schedule="@hourly", partition=PartitionByInterval("@hourly"))
def raw_hourly_data():
    ...

@asset(schedule=raw_hourly_data, partition=PartitionByInterval("@hourly"))
def hourly_data():
    ...

The partition of raw_hourly_data covering 2pm must be materialized first for the partition of hourly_data covering the same period to be materialized.

If the assets’ partitions do not align exactly, all upstream partitions that overlaps the downstream must be materialized first. So

@asset(schedule=raw_hourly_data, partition=PartitionByInterval("30 * * * *"))
def hourly_data_shifted():
    ...

To materialize partition covering 14:30–15:30, both the 2pm and 3pm partitions of raw_hourly_data must first materialize.

If you want a downstream to aggregate multiple partitions from the upstream, you can do

@asset(schedule=hourly_data, partition=PartitionByInterval("@daily"))
def aggregated_daily_data():
    ...

Every partition of this asset depends on 24 partitions of hourly_data of the day.

Conversely, if an upstream is not updated often, but the downstream needs to be calculated more often, you can do

@asset(schedule="@yearly", partition=PartitionByInterval("@yearly"))
def yearly_data():
...

@asset(schedule=yearly_data, partition=PartitionByInterval("@monthly"))
def data_using_slow_data():
...

After each yearly update, 12 materialization runs are scheduled for the downstream since they all depend on the same upstream partition.

Backfilling

With the context of partitions, backfilling is an operation that retroactively materializes time-based partitions in the past. Only assets with time-based partitioning can be backfilled—the concept does not make sense otherwise (assets either partitioned by sequence or not at all).

We’re not adding separate start_date and catchup arguments to assets. The behavior is always the same as catchup=False; backfills must be initiated by a user request after an asset is created. The user must explicit provide a time period when kicking off backfilling.

This AIP does not intend to design how backfills are implemented (should run via the executor and things like that). Here we only intend to understand what consists of a backfill in an abstract sense. As mentioned in AIP-75, first-class @asset constructs require a separated set of user interfaces (including the web UI, CLI, and REST API), and we only need to implement backfilling partitions in those new views for this AIP.

Manual Runs

Asset materialization can be triggered manually. Each manual trigger needs to specify exactly which partition to cover. We should have a rich selector mechanism in the web UI. Re-materializing an existing partition should be considered a re-run and subject to AIPs 63, 64 and 66—an asset is internally analogous to a DAG, and its materialization recorded with TaskInstance. If a partition has previously been successfully materialized manually, later scheduled materialization will skip that partition.

Future Work

Topics listed in this section are incremental enhancements to the core concepts, and can be implemented at any point without breaking compatibility. They may still be implemented and released with other parts of this AIP if we can figure out the needed details in the process, but are not requirements for this AIP to be considered DONE.

Shorthands for Time-based Partitioning

There are some possibilities to simplify the partition argument. For example, "@daily" seems like an obvious shorthand to PartitionByInterval("@daily"). This does not work as well with more sophisticated cases, such as

@asset(..., partition=PartitionByInterval(
"@daily",
start=datetime.date(2024, 1, 1), # For backfilling.
))
def daily_data_from_2024():
...

It is not possible to express this with simple Python primitives. Since arguably most real-world scenarios require backfilling to some extent, the shorthand suddenly not too worthwhile if it only works for simplified tutorials.

There are also ambiguous cases. Shortening PartitionBySequence(["foo", "bar"]) to ["foo", "bar"] looks obvious at first glance, but it also makes sense to shorten a PartitionByProduct to a list. Since it is technically not possible to reliably distinguish the two cases, maybe the best approach to do neither in the name of explicitness.

Partial refresh

Your data might be partitioned and each day you process a new partition.  But you might also need, every day, to reprocess the last N partitions to capture updates to those partitions.  It would be nice if we could support this.

Automatic Partition Inference

A lot of the time, we just want to partition the asset by the schedule—for example, if the asset is materialized every day at noon, a partion covers the previous noon to the lastest. It would be useful to not need to repeat the 0 12 * * * twice. Similarly, if an asset does not have a schedule (only materialized manually), it normally isn’t partitioned either. It would be nice if we can reduce the boilerplate.

One idea is to make partition optional, and defaults to “auto mode” that:

  • If the schedule is time-based, a partition is created from the period between each scheduled run.
  • If the schedule is not time-based, there is no partition.

This sounds reasonable, but how about assets scheduled against a time-based asset? Do they automatically use the same partitions? What if they are scheduled against multiple assets with different schedules using & or |?

A quick experiement also revealed a minor reability issue. If I see

@asset(
schedule="@daily",
# Many
# lines
# of
# params
# ...
)
def my_asset():
...

I must read all the lines carefully to find if there’s an explicit partition parameter, and can be sure the partition is @daily only after reading every simple line and making sure partition is not in there. If the argument is required, I can always simply scan for partition= to find the information I need.

We are therefore making partition a required parameter in the initial proposal. This buys us time to know what is the best solution here by observing real use cases.

Complex Schedule-Partition Mapping

Up until now, we’ve only considered time-based cases where the partition logically related to the materialization time. Sometime, however, you would want even more interesting partitions. For example, a hourly-materialized asset that modifies data for the past two hours (the new hour, and the hour before it that was also covered by the previous partition—a.k.a. a slicing window). There are even more exotic use cases, and it is not possible for Airflow to include every possibility. It is likely better to have a pluggable interface similar to Timetable for users to provide custom partitioning logic, and Airflow can adopt popular ones.

Complex Partition Dependency Rules

Similar to the schedule-partition mapping, we also only have considered partition dependencies where the upstream partitions cover downstream ones. There are, however, valid uses where you want to depend on other upstream partitions. Again, it is better to allow for custom dependency rules. The situation here is somewhat alike trigger rules—the ability to customise it is also raised as a feature request from time to time. They are also similarly non-trivial and requiring more thought.

Incremental Loads and Watermarking

Watermarking is a popular way to add markers to data, generally a streaming kind (comeing in continuously), so a pipeline knows where to continue processing. The simplest form of this can be covered by runtime partitioning—using the partition key to store the high watermark, while the low watermark is the key of the previous partition. Watermarking is generally more than this—the rest of the mechanism, arguably the point of watermarking in the first place, surrounds the idea of when we should stop waiting and start a run. This “when should the run start” problem, however, is a scheduling one, not partitioning. Since one of the root idea of this proposal is to split scheduling and partitioning into different concepts, the topic is not covered in the document, but a separate one in the future, building on the foundation set here.


11 Comments

  1. I have a hard time understanding the full scope of how this will work all together - I see cool features and primitives/syntax but feel a lot of complexity will be needed in the backend. Some additional questions not fitting into one chapter:

    • Do you see an upper limit for partitions? Similar like in DynamicTaskMapping we limit to 1024 per default.
    • If a DAG has a schedule from an asset and the asset is partitioned:
      • Can I assume the scheduler runs N tasks in parallel like if a full DAG is "task mapped" (like in Airflow 2.9 if I put all tasks into a task group and make a dynamic task mapping to the task group)?
      • How would a run be showing in the UI?
      • Will partitioning work also in this regards together with DynamicTaskMapping? (Mapping tasks in a partition)
    1. re: partition upper limit, an artificial limit is definitely a good idea for segment-based static partitions. Time-based partitions naturally can’t have limits (time is indefinite), and I don’t think we should have limits on dynamic partitions either. We must implement around potential performance issues.

      1. Thanks, that makes it clear. Also with the comment below if N partitions make N DAG runs then there is no technical limit except the amount of DAGs you can schedule.

    2. re: DAG schedule with partitioned asset dependency

      The scheduler should create N DAG runs in parallel, not a DAG run with N executions of each task. I see what you mean and yes you can think of things that way, but the subtle difference is important. The general UI representation would be similar because each (DAG) run is still represented by a separate entry. How to identify the runs with a string might be a bit tricky but we’ll come up with something. Same regarding dynamic task mapping—since each partition is mapped to an entire separate DAG run, the two concepts should be entirely independent from each other. If a task is mapped to M instances, and we have N partitions to cover, you get N runs, each with M task instances, so NxM tis in total.

      1. I like these explanations and examples - very clear. One thing that is likely an implementation detail is how "timedelta" partitions alignment will look like - because it needs to have some alignment for the schedules. It's easy (and intuitive) for @hourly  and @daily  but possibly for things like @every-8-minutes  - it's not obvious what is the "start-time" of particular partition.  Possibly (and that should also apply to the @hourly  etc. - maybe we should use start  to determine the alignment? For example @hourly  with start=2021-12-10-10:35:00 will mean that the hourly partition will be aligned at :35:00 ? Or maybe we should have another way to specify alignment? 

      2. Thanks for the clarifications. That makes it clearer.

        And yes... the partition identifier then needs to be added to run_id to make it unique. Benefit of the proposal as described is that you are not limited and can use Dynamic Mapped Tasks in the DAG which would be limited if the DAG maps all tasks.

        A grouping (visual) might still be an option on the UI to logically group all DAG run partitions together. So that it is "easy" to see the bundle of runs.

      3. I like this clarification, very helpful.

  2. It may be useful to see how partitioning can be used to get/process data from external systems such as Iceberg, just to ensure that the mapping is compatible. 
    For example, this is how Iceberg thinks of partitioning https://iceberg.apache.org/docs/latest/partitioning/

    1. I would let Tzu-ping Chung confirm, but something like following should work since it will be an abstract and loose concept:

      @asset(schedule="@daily", partition=PartitionByInternal("@daily"))
      def daily_sales_data():
          context = get_current_context()
          partition_date = context["partition"]
      
          # Load Iceberg catalog and table
          catalog = load_catalog('my_catalog')
          table = catalog.load_table('sales')
      
          # Fetch data for the given partition date
          data = table.scan().filter(f"ts >= '{partition_date} 00:00:00' AND ts < '{partition_date} 23:59:59'").to_arrow()
      
          # Process the fetched data
          process_partition(data)
      1. I’m still figuring out how context["partition"] should work, but yes this is the idea.

        1. Got it, that makes sense to me