Status
Motivation
Rationale
To increase data reliability, we propose adding capabilities for data consumers and data producers to define and verify expectations around input and output assets via Operators and/or Asset definitions. Data Engineers can state things like expected column names and data types, update frequency, or even what the contents of a field should look like, allowing them to define their expectations.
Proposed Changes
There would be two types of validations:
- Pre-runtime validations for data consumers to verify that established data flow contracts are valid
- Post-runtime validations for data producers that publicize that the data that was processed upholds an established contract
As data consumers, you would be able to define pre-runtime validation rules that are executed prior to the task, such as for example checks to ensure that certain columns exist. If discrepancies are found, Airflow could send a notification, preventing potential runtime errors and removing the need to perform cleanup actions. These could be set as part of the task definition (“inlet_validations” in the example) or as part of setting asset dependencies if using an asset-centric syntax.
As data producers, you could define validation rules that “certify” that the data was processed correctly, for example checks to ensure that values in a specific column follow a pattern, like phone numbers. This could be set as part of the actual asset definition.
Lastly, we should have built-in validations that are accessible via the asset and also allow for custom user-defined validations.
raw_sales = ColumnedAsset(
'raw_sales',
uri='s3://...',
outlet_validations=[AssetValidation.has_columns('sales')],
column=['sales'],
)
aggregated_sales = Dataset(
'aggregated_sales',
uri='snowflake://...',
column=[...],
)
task_aggregate_sales = SqlOperator(
inlets=[raw_sales],
outlets=[aggregated_sales],
inlet_validations=AssetValidation.has_columns(raw_sales.sales),
sql="""
INSERT INTO {{ outlets.aggregated_sales }}
SELECT SUM({{ inlets.raw_sales.sales }})
FROM {{ inlets.raw_sales }}
""",
)