Status

StateDraft
Discussion Thread


Vote Thread
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

2024-06-26

Version Released
Authors

Motivation

Rationale

To increase data reliability, we propose adding capabilities for data consumers and data producers to define and verify expectations around input and output assets via Operators and/or Asset definitions. Data Engineers can state things like expected column names and data types, update frequency, or even what the contents of a field should look like, allowing them to define their expectations.

Proposed Changes

There would be two types of validations:

  • Pre-runtime validations for data consumers to verify that established data flow contracts are valid
  • Post-runtime validations for data producers that publicize that the data that was processed upholds an established contract

As data consumers, you would be able to define pre-runtime validation rules that are executed prior to the task, such as for example checks to ensure that certain columns exist. If discrepancies are found, Airflow could send a notification, preventing potential runtime errors and removing the need to perform cleanup actions. These could be set as part of the task definition (“inlet_validations” in the example) or as part of setting asset dependencies if using an asset-centric syntax.

As data producers, you could define validation rules that “certify” that the data was processed correctly, for example checks to ensure that values in a specific column follow a pattern, like phone numbers. This could be set as part of the actual asset definition.

Lastly, we should have built-in validations that are accessible via the asset and also allow for custom user-defined validations.

raw_sales = ColumnedAsset(
  'raw_sales',
  uri='s3://...',
  outlet_validations=[AssetValidation.has_columns('sales')],
  column=['sales'],
)

aggregated_sales = Dataset(
  'aggregated_sales',
  uri='snowflake://...',
  column=[...],


task_aggregate_sales = SqlOperator(
  inlets=[raw_sales],
  outlets=[aggregated_sales],
  inlet_validations=AssetValidation.has_columns(raw_sales.sales),
  sql="""
      INSERT INTO {{ outlets.aggregated_sales }}
      SELECT SUM({{ inlets.raw_sales.sales }})
      FROM {{ inlets.raw_sales }}
  """,
)