For various reasons, it can be helpful to persist state arbitrary information for tasks or dags. For some pipelines, it may be strictly necessary.
One use case is incremental processes; incremental processes are a very common use pattern, and currently it does not have first-class support in airflow.
Another use case is if you are polling for a remote async job. You might want to store the job id, and try again in the event of a failure.
For these and other kinds of tasks, it is helpful to be able to persist some state metadata to the metastore.
What problem does it solve?
Below I enumerate a couple job patterns that benefit from state persistence.
The current mechanisms for storing state are problematic for reasons outlined below.
Why is it needed?
The absence of support for incremental processes forces users to either use xcom or variables with the problems that entails, or to invent and implement their own state persistence structure and deploy it with airflow.
The incremental design pattern is common enough that support for it in airflow is warranted. We should provide a standard way to implement this kind of task to remove the burden from users and increase uniformity across implementations.
Are there any downsides to this change?
Every option proposed here is a non-breaking change that is 100% backwards-compatible.
Some argue that incremental processes are a bad design pattern that should not be supported on airflow. I think that in reality, it's quite common that airflow is used for incremental processes, or processes that are stateful in some way, or which otherwise deviate from the classic idempotence pattern. I personally we should accept this reality and support it. The tradeoff of added complexity in my view is minimal. Some tasks just want to be incremental; they make more sense this way, are simpler this way, and are more reliable this way, with lower maintenance burden.
Which users are affected by the change?
Only users that actively choose to use the features.
How are users affected by the change? (e.g. DB upgrade required?)
They are not affected unless they use the new features.
What defines this AIP as "done"?
In this AIP I propose a few different options. If we can reach general agreement on structure, then I will work on a PR. The changes should be relatively straightforward. Once PR is merged, AIP is done.
Job patterns benefitting from state persistence
There are many kinds of tasks that fit this pattern. Examples:
- pulling from database based on datetime_modified (only retrieve updated records)
- pulling files from sftp (only retrieve new files)
- data warehouse processing – facts, dimensions, reporting
For these kinds of tasks it is convenient to store task state in the database, sometimes called a "watermark". Your operator might do something like the following:
- at task start, do two things:
- get last high watermark if exists; otherwise use some initial load value, e.g. "0" or "1900-01-01".
- query source to find new high watermark
- this could be "select max(datetime_modified) from source_table"
- or "select max(integer_pk_id) from source_table"
- or it could be the min of max modified date for a number of source tables, e.g. in the case of data warehouse processing
- if task completes successfully, at end of process, update new high watermark
It is often suggested to ascertain state by inspecting the target. Sometimes this is simply not impossible.
Remote async processes
Currently if you have an async process, your task may idly occupy a worker slot when it is simply waiting for a remote async job to complete. Being able to persist a job id to the metastore would support the creation of sensor-like operators that can check to see if the job is done and once it is, process the job.
Problems with the existing approaches
Currently there are two ways to store state information in the metastore: Variables and XCom.
XCom can be made to work as a mechanism of state persistence. But there are just a couple idiosyncrasies that make it problematic.
Clearing at task start
XCom records are cleared at task start, even after failure or reschedule. This means they cannot be used for the remote async job pattern. They also cannot be used for incremental jobs where you want to keep track of progress in a large batched process and resume after a retry in the event of failure.
If we can provide a way to optionally ensure that xcom value is not cleared at task start, then this removes the main concern with using XCom.
The task instance attribute
execution_date is mildly problematic for incremental jobs because it can be out of order with actual execution time. This can happen when a user clicks "trigger dag" in the web UI (because it uses an execution date of now, while ordinarily a task will have have an execution date of now-minus-one-schedule-interval).
If a user does make the mistake of clicking trigger dag on an incremental job using xcom, generally nothing terrible will happen. Generally speaking the problem will just be that for a couple runs you pull more data than you needed to. This could be remedied by making it possible to get previous xcom by timestamp instead of execution date.
The other issue is just that for many incremental jobs you really don't care about the execution history. I.e. one record per task + dag would suffice, so having a record for every task + dag + execution date just uses space unnecessarily and makes it slightly more inconvenient to inspect and modify state if necessary.
Variables are a decent solution for incremental jobs, but are a bit messy. Because there is only one field to identify a variable, it makes it tougher to use and keep organized. If you already use variables for arbitrary manual job configuration, then adding 1000s of records of automatically-created state-persistence variables would potentially pollute your variable namespace and make it tougher to use.
Additionally, it does not make sense to use Variable for state persistence with built-in operators because there is no way to prevent namespace conflicts.
Solution 1: add column "is_stateful" to XCom
Add a boolean column
is_stateful to XCom table. XCom records with
is_stateful=True would not be cleared at task start. This would allow you to retrieve state information after failures or reschedules.
This would serve both the incremental and remote-async use patterns well.
But it adds complexity to XCOM which is already somewhat complicated.
Solution 2: add column "state_data" to task_instance table
Add string column
state_data to the
Add TaskInstance methods like the following:
- get_state (gets current value of
- get_previous_state (gets
state_datafor previous TI)
- set_state (set
This would serve the remote-async use pattern well. It would be a little more awkward than XCom for the incremantal use pattern because when when you want to "get current state" you can't necessarily just grab the latest task instance. Because a task instance record is created for every task instance, whether successful or not, you have to get the latest task instance with state data. With XCom, you can be confident that if you grab the latest XCom , you'll be getting latest state, because the record is only created when you push a new value.
Solution 3: add TaskState model
Columns would be something like the following:
- dag_id (primary key)
- task_id (primary key)
A TaskState model would be perfect for jobs such as incremental process that don't care about execution date. Keeping only the current state (as opposed to keeping a record for every task run) makes it easier to find and manually alter the current job state if desired. And it fits the incremental pattern well conceptually: there is only one state at any given time. However, this solution makes less sense for the remote async report use pattern, because in that pattern you might have a different remote async jobs for different runs of a task and you might want them to run concurrently.
Solution 4: add column "namespace" to Variable
It is already possible to use Variable as a store of task state. The problem is that it is a little messy because it is they are named with just one field. Simply adding a namespace column would make variables a bit more practical to use for state persistence.
For example on my team we pull a few hundred tables from one database to another on an incremental basis. We use a model with this structure to keep track of state. The namespace is something like 'acquisition_server01' and the variable key is the target table name.
However, while would help enable end users to create stateful operators for their own use, it would not be a practical solution for supporting built-in stateful operators, because we would not really be able to guarantee that variable names we chose would not collide with those found on clusters in the wild.
Solution 5: add ProcessState model
Exactly the same as TaskState but for arbitrary processes.
- namespace (primary key)
- process_name (primary key)
It's also the same as namespaced variables but it is built with a different purpose and named accordingly.
Having a state store that is decoupled from dag_id and task_id is convenient because you can then freely rename your dag or task (or move your task from one dag to another) without breaking the state store.
It's especially convenient when you have a family of tasks that are generated from a config class (because then it's easy to control the naming).
Solution 6: add TaskInstanceState model
Same as TaskState model but with added primary key column `execution_date`.
XCom is already somewhat complicated (e.g. there can be multiple xcoms per task instance (namespaced by `key`) and you have to worry about whether you are "including prior dates" etc. Adding a distinct purpose-built model, though very similar to xcom, lets us keep both models simpler and focused on their purposes. It would also keep both tables smaller which is good for query performance.
I propose we add ProcessState, TaskState, and TaskInstanceState (solutions 3, 5 and 6).
Why not add to xcom (solution 1)?
XCom is already a bit complicated. There are multiple keys per task instance. It's cleared at task start. If we make task clearing configurable at the task instance + key grain, that makes it more complicated, and makes it's behavior tougher to understand for users. XCom is a larger table than is necessary to support state. Using a TaskInstanceState model would be much simpler to implement and maintain, and easier for users to understand.
Why not add namespace to variable (solution 4)?
Variables serve a different purpose. There's no need to mess with them.
Why not add state to task instance table (solution 2)?
Instead of adding a TaskIntanceState model, we could add something like `state_data` to the task_instance table. This option does have some appeal. But there's nothing analogous to do for tasks or processes. So I figure it's better to implement support for state in the exact same way for each of these various scopes. Having this separation can also be beneficial in terms of query performance and ease of making changes.
Why do we need all of these tables?
Airflow is used for orchestrating a wide variety of tasks and there is a need to track state at three different scopes:
- the task
- the task instance
- arbitrary processes