Page tree
Skip to end of metadata
Go to start of metadata


Current stateAccepted

Discussion thread:

JIRA: FLINK-14977 - Getting issue details... STATUS

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


  • Primary and unique constraint are important hints that can be used during query optimizations such as e.g. reducing the number of columns to group on if the group condition contain the whole primary/unique key constraint.
  • Additionally primary keys are useful for upsert streams. The primary key might be used as the upsert key.

Public Interfaces

Constraint class hierarchy:

public interface Constraint {
    String getName();

    boolean isEnforced();

    ConstraintType getType();

    enum ConstraintType {

abstract class AbstractConstraint implements Constraint {

	private final String name;
	private final boolean enforced;

	AbstractConstraint(String name, boolean enforced) { = checkNotNull(name);
		this.enforced = enforced;

	public String getName() {
		return name;

	public boolean isEnforced() {
		return enforced;

public final class KeyConstraint extends AbstractConstraint {

	private final List<FieldReferenceExpression> columns;
	private final ConstraintType type;

	public static KeyConstraint primaryKey(String name, boolean enforced, FieldReferenceExpression... columns) {
		return new KeyConstraint(name, enforced, ConstraintType.PRIMARY_KEY, Arrays.asList(columns));

	public static KeyConstraint uniqueKey(String name, boolean enforced, FieldReferenceExpression... columns) {
		return new KeyConstraint(name, enforced, ConstraintType.UNIQUE_KEY, Arrays.asList(columns));

	private KeyConstraint(
			String name,
			boolean enforced,
			ConstraintType type,
			List<FieldReferenceExpression> columns) {
		super(name, enforced);

		this.columns = columns;
		this.type = type;

	public ConstraintType getType() {
		return ConstraintType.UNIQUE_KEY;

	public List<FieldReferenceExpression> getColumns() {
		return columns;

Method for retrieving primary key constraint in TableSchema

Method in CatalogBaseTable
public interface TableSchema {
	Optional<KeyConstraint> getPrimaryKey();

Constraint DDL:

CREATE TABLE [catalog_name.][db_name.]table_name
  [(col_name1 col_type1 [IN_LINE_CONSTRAINT] [COMMENT col_comment1], 
  [COMMENT table_comment]
  [PARTITIONED BY (col_name1, col_name2, ...)]

  [CONSTRAINT constraint_name] ((PRIMARY KEY | UNIQUE) (column, ...)) [[NOT] ENFORCED]


-- possible extension, not part of the FLIP, as we do not have alter statements
ALTER TABLE [catalog_name.][db_name.]table_name
  DROP CONSTRAINT constraint_name

Proposed Changes

We suggest to introduce the concept of primary key constraint as a hint for FLINK to leverage for optimizations.

Primary key constraints tell that a column or a set of columns of a table or a view are unique and they do not contain null. Neither of columns in a primary can be nullable. Primary key therefore uniquely identify a row in a table.

Unique key constraints tell that a column or a set of columns of a table or a view are unique. Unique key constraint do no impose NOT NULL constraint on its columns.

Primary key validity checks

SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode. Its up to the user to ensure that the query enforces key integrity.

We will assume correctness of the primary key. We assume that the columns nullability is aligned with the columns in primary key. We will validate this when creating a TableSchema. Connectors should ensure those are aligned.

In a CREATE TABLE statement, creating a primary key constraint will alter the columns nullability according to

If the <unique specification> specifies PRIMARY KEY, then for each <column name> in the explicit or implicit <unique column list> for which NOT NULL is not specified, NOT NULL is implicit in the <column definition>.

In alter statements if a users says to create a primary key on a nullable column the operation should fail.

DDL default value

If a user does not provide the enforcement flag the ENFORCED will be set implicitly. This mode will be unsupported in the initial version and it will throw an exception.

Usage of primary key information for sources and sinks

This FLIP does not discuss runtime handling of the primary key information by sources and sinks. This additional information will not change the way how upsert sources and sinks work (We do not support UPSERT sources yet. For sinks we will stick to the current validation implementation). Usage of the primary key for upsert sources and sinks shall be discussed in a separate FLIP. This will not alter any UpsertSink implementations. This is merely a metadata information for optimization. We might use it in the future for query validation, e.g. validate that we perform upserts on the whole key, or use autogenerated values for missing fields etc.

Unique keys support

Support for Unique key is not part of the FLIP. It is just mentioned to show how can we extend the primary key concept with more constraints in the future. Support for UNIQUE keys requires agreeing on the null handling in a unique constraint. Some systems allow multiple rows with a null value in a column with UNIQUE constraint, whereas other just a single row.


For a table created with a statement CREATE TABLE unique_table(id INTEGER UNIQUE);

Some systems will allow inserting multiple rows with a following statement: INSERT INTO unique_table(null); whereas others will throw exception on second invocation of that statement.

Compatibility, Deprecation, and Migration Plan

This change introduces a new feature that does not implies any compatibility concerns.


For the time being insert/upsert into tables with primary key defined will not be supported. The support should be discussed in a separate FLIP.

Implementation Plan

  1. Add the primary key information to CatalogBaseTable
  2. Use the primary key information in optimizer
  3. Modify HiveCatalog connector to provide the primary key information
  4. Introduce DDL (side note: this is not aimed for 1.10)

Test Plan

We want to make sure, we can leverage the primary key information stored in Hive. This might require changes to the Hive catalog connector in regards to the produced TableSchema.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels