DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Motivation
Rationale
To increase data reliability, we propose adding capabilities for data consumers and data producers to define and verify expectations around input and output assets via Operators and/or Asset definitions. Data Engineers can state things like expected column names and data types, update frequency, or even what the contents of a field should look like, allowing them to define their expectations.
Proposed Changes
There would be two types of validations:
- Pre-runtime validations for data consumers to verify that established data flow contracts are valid
- Post-runtime validations for data producers that publicize that the data that was processed upholds an established contract
As data consumers, you would be able to define pre-runtime validation rules that are executed prior to the task, such as for example checks to ensure that certain columns exist. If discrepancies are found, Airflow could send a notification, preventing potential runtime errors and removing the need to perform cleanup actions. These could be set as part of the task definition (“inlet_validations” in the example) or as part of setting asset dependencies if using an asset-centric syntax.
As data producers, you could define validation rules that “certify” that the data was processed correctly, for example checks to ensure that values in a specific column follow a pattern, like phone numbers. This could be set as part of the actual asset definition.
Lastly, we should have built-in validations that are accessible via the asset and also allow for custom user-defined validations.
raw_sales = ColumnedAsset(
'raw_sales',
uri='s3://...',
outlet_validations=[AssetValidation.has_columns('sales')],
column=['sales'],
)
aggregated_sales = Dataset(
'aggregated_sales',
uri='snowflake://...',
column=[...],
)
task_aggregate_sales = SqlOperator(
inlets=[raw_sales],
outlets=[aggregated_sales],
inlet_validations=AssetValidation.has_columns(raw_sales.sales),
sql="""
INSERT INTO {{ outlets.aggregated_sales }}
SELECT SUM({{ inlets.raw_sales.sales }})
FROM {{ inlets.raw_sales }}
""",
)