DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: [Accepted]
Discussion thread: https://lists.apache.org/thread/vws45zvrvbzjxm2zcbk715go8fll2qx4
Vote thread: https://lists.apache.org/thread/7yhh5k5p5bf3xml1qf6wn5fvf6o8gjtc
JIRA:
FLINK-39227
-
Getting issue details...
STATUS
Released: <Flink Version>
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink has introduced the Delta Join operator, whose core advantage lies in replacing redundant local state storage with direct queries to external storage systems (e.g., Apache Fluss). This significantly reduces state size and lowers computational resource consumption [1].
The current implementation of Delta Join heavily relies on the upsert key:
The upsert key from the join’s upstream must include the join key.
The sink’s primary key must be consistent with the upstream upsert key.
These constraints ensure correct changelog processing without depending on UPDATE_BEFORE messages. However, this design implicitly assumes that the join key must be part of the primary key.
As storage systems increasingly support general-purpose secondary indexes (i.e., indexes no longer restricted to the primary key or the prefix of the primary key), the above constraints will become obsolete. To enable Delta Join to leverage arbitrary secondary indexes for efficient lookups, we need a new semantic mechanism to guarantee the immutability of the join key—specifically, that for a given primary key, the column values comprising the join key cannot be modified.
To address this, we propose a new IMMUTABLE columns constraint, with the following semantics:
-- Example DDL. Note: This FLIP does not yet expose the constraint via DDL syntax; -- it only introduces the constraint at the Schema. The example below -- is provided solely for illustrative purposes. CREATE TABLE src ( a int, b string, c bigint, d double, PRIMARY KEY (a) NOT ENFORCED, CONSTRAINT no_up_cols COLUMNS (b, c) IMMUTABLE NOT ENFORCED ) For the same PK (e.g., a = 1), once values for columns b and c are written, subsequent updates must not alter them. Deletion operations are also prohibited (since deletion is equivalent to “clearing” immutable columns). Examples: INSERT[1, 's1', 1L, 1.0] → Valid UPDATE[1, 's1', 2L, 2.0] → Invalid(c changed) UPDATE[1, 's1', 1L, 4.0] → Valid(b and c unchanged) DELETE[1, 's1', 1L, 1.0] → Invalid
Goals of this FLIP:
Introduce a new table-level constraint—the IMMUTABLE Columns Constraint—to declare that certain columns, once set for a given primary key, cannot be modified.
Maintain compatibility with existing upsert key / unique key mechanisms and enable the optimizer to leverage this constraint for further optimizations.
Relationship with Primary Key
- If the
IMMUTABLEconstraint is defined on a table without a primary key, an error will be thrown: "IMMUTABLE constraints must be defined on tables with a primary key." - If the columns specified in a IMMUTABLE constraint are identical to the primary key (PK) columns, the IMMUTABLE constraint will be silently accepted.
Public Interfaces
Note: This Flip does not currently address the introduction this constraint syntax.
1. Add new `ConstraintType.IMMUTABLE_COLUMNS`
/**
* Type of the constraint.
*
* <ul>
* <li>Unique constraints:
* <ul>
* <li>UNIQUE - is satisfied if and only if there do not exist two rows that have same
* non-null values in the unique columns
* <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none of the values
* in specified columns be a null value. Moreover there can be only a single PRIMARY
* KEY defined for a Table.
* </ul>
* <li>Immutable constraints - is satisfied these specific columns corresponding each pk will
* not be modified with new values.
* </ul>
*/
@PublicEvolving
enum ConstraintType {
PRIMARY_KEY,
UNIQUE_KEY,
IMMUTABLE_COLUMNS
}
2. Add `ImmutableColumnsConstraint`
/**
* A constraint for immutable columns. All columns within each pk in this constraint will not be
* modified with new values.
*
* @see ConstraintType
*/
@PublicEvolving
public final class ImmutableColumnsConstraint extends AbstractConstraint {
private final List<String> columns;
public ImmutableColumnsConstraint(String name, boolean enforced, List<String> columns) {
super(name, enforced);
this.columns = columns;
}
/** Creates a non enforced {@link ConstraintType#IMMUTABLE_COLUMNS} constraint. */
public static ImmutableColumnsConstraint immutableColumns(String name, List<String> columns) {
return new ImmutableColumnsConstraint(name, false, columns);
}
public List<String> getColumns() {
return columns;
}
@Override
public ConstraintType getType() {
return ConstraintType.IMMUTABLE_COLUMNS;
}
@Override
public String asSummaryString() {
return String.format(
"CONSTRAINT %s COLUMNS (%s) IMMUTABLE%s",
EncodingUtils.escapeIdentifier(getName()),
columns.stream()
.map(EncodingUtils::escapeIdentifier)
.collect(Collectors.joining(", ")),
isEnforced() ? "" : " NOT ENFORCED");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ImmutableColumnsConstraint that = (ImmutableColumnsConstraint) o;
return Objects.equals(columns, that.columns);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), columns);
}
}
3. Add immutable column constraint in `Schema` 和 `ResolvedSchema`
@PublicEvolving
public final class Schema {
...
private final @Nullable UnresolvedImmutableColumns immutableColumns;
...
public Schema(
List<UnresolvedColumn> columns,
List<UnresolvedWatermarkSpec> watermarkSpecs,
@Nullable UnresolvedPrimaryKey primaryKey,
List<UnresolvedIndex> indexes,
@Nullable UnresolvedImmutableColumns immutableColumns) {
this.columns = Collections.unmodifiableList(columns);
this.watermarkSpecs = Collections.unmodifiableList(watermarkSpecs);
this.primaryKey = primaryKey;
this.indexes = Collections.unmodifiableList(indexes);
this.immutableColumns = immutableColumns;
}
... (Omit `getter`, `toString`, `equals` and `hashCode`)
/** A builder for constructing an immutable but still unresolved {@link Schema}. */
@PublicEvolving
public static final class Builder {
...
private @Nullable UnresolvedImmutableColumns immutableColumns;
...
/**
* Declares an immutable columns constraint for a list of given columns. Immutable columns
* constraint is used to identify which columns in a table are not allowed to be modified.
* Currently, this constraint is informational only and is not enforced. It can be utilized
* for optimization purposes. It is the responsibility of the data owner to ensure that
* these columns are unmodified.
*
* <p>The immutable columns will be assigned a generated name in the format {@code
* IMMUTABLE_COLUMNS_col1_col2}.
*
* @param columnNames columns that form the constraint for immutable columns
*/
public Builder immutableColumns(String... columnNames) {
Preconditions.checkNotNull(columnNames, "Immutable column names must not be null.");
return immutableColumns(Arrays.asList(columnNames));
}
/**
* Declares an immutable columns constraint for a list of given columns. Immutable columns
* constraint is used to identify which columns in a table are not allowed to be modified.
* Currently, this constraint is informational only and is not enforced. It can be utilized
* for optimization purposes. It is the responsibility of the data owner to ensure that
* these columns are unmodified.
*
* <p>The immutable columns will be assigned a generated name in the format {@code
* IMMUTABLE_COLUMNS_col1_col2}.
*
* @param columnNames columns that form the constraint for immutable columns
*/
public Builder immutableColumns(List<String> columnNames) {
Preconditions.checkNotNull(columnNames, "Immutable column names must not be null.");
final String generatedConstraintName =
columnNames.stream().collect(Collectors.joining("_", "IMMUTABLE_COLUMNS_", ""));
return immutableColumnsNamed(generatedConstraintName, columnNames);
}
/**
* Declares an immutable columns constraint for a list of given columns. Immutable columns
* constraint is used to identify which columns in a table are not allowed to be modified.
* Currently, this constraint is informational only and is not enforced. It can be utilized
* for optimization purposes. It is the responsibility of the data owner to ensure that
* these columns are unmodified.
*
* @param constraintName name for the immutable columns constraint, can be used to reference
* this constraint
* @param columnNames columns that form the constraint for immutable columns
*/
public Builder immutableColumnsNamed(String constraintName, String... columnNames) {
Preconditions.checkNotNull(columnNames, "Immutable column names must not be null.");
return immutableColumnsNamed(constraintName, Arrays.asList(columnNames));
}
/**
* Declares an immutable columns constraint for a list of given columns. Immutable columns
* constraint is used to identify which columns in a table are not allowed to be modified.
* Currently, this constraint is informational only and is not enforced. It can be utilized
* for optimization purposes. It is the responsibility of the data owner to ensure that
* these columns are unmodified.
*
* @param constraintName name for the immutable columns constraint, can be used to reference
* this constraint
* @param columnNames columns that form the constraint for immutable columns
*/
public Builder immutableColumnsNamed(String constraintName, List<String> columnNames) {
Preconditions.checkState(
immutableColumns == null,
"Multiple constraints for immutable columns are not supported.");
Preconditions.checkNotNull(
constraintName, "Immutable column constraint name must not be null.");
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(constraintName),
"The constraint name for immutable columns must not be empty.");
Preconditions.checkArgument(
columnNames != null && !columnNames.isEmpty(),
"The constraint for immutable columns must be defined for at least a single column.");
immutableColumns = new UnresolvedImmutableColumns(constraintName, columnNames);
return this;
}
/** Returns an instance of an unresolved {@link Schema}. */
public Schema build() {
return new Schema(columns, watermarkSpecs, primaryKey, indexes, immutableColumns);
}
...
}
...
/**
* Declaration of a list of immutable columns that will be resolved to {@link
* ImmutableColumnsConstraint} during schema resolution.
*/
@PublicEvolving
public static final class UnresolvedImmutableColumns extends UnresolvedConstraint {
private final List<String> columnNames;
public UnresolvedImmutableColumns(String constraintName, List<String> columnNames) {
super(constraintName);
this.columnNames = columnNames;
}
public List<String> getColumnNames() {
return columnNames;
}
@Override
public String toString() {
return String.format(
"%s COLUMNS (%s) IMMUTABLE NOT ENFORCED",
super.toString(),
columnNames.stream()
.map(EncodingUtils::escapeIdentifier)
.collect(Collectors.joining(", ")));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
UnresolvedImmutableColumns that = (UnresolvedImmutableColumns) o;
return columnNames.equals(that.columnNames);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), columnNames);
}
}
}
@PublicEvolving
public final class ResolvedSchema {
...
private final @Nullable ImmutableColumnsConstraint immutableColumns;
...
public ResolvedSchema(
List<Column> columns,
List<WatermarkSpec> watermarkSpecs,
@Nullable UniqueConstraint primaryKey,
List<Index> indexes,
@Nullable ImmutableColumnsConstraint immutableColumns) {
this.columns = Preconditions.checkNotNull(columns, "Columns must not be null.");
this.watermarkSpecs =
Preconditions.checkNotNull(watermarkSpecs, "Watermark specs must not be null.");
this.primaryKey = primaryKey;
this.indexes = Preconditions.checkNotNull(indexes, "Indexes must not be null.");
this.immutableColumns = immutableColumns;
Preconditions.checkArgument(
primaryKey != null || immutableColumns == null,
"Constraint for immutable columns must be defined on pk table");
}
...
/** Returns the constraint about immutable columns if it has been defined. */
public Optional<ImmutableColumnsConstraint> getImmutableColumns() {
return Optional.ofNullable(immutableColumns);
}
...
/**
* Returns the indexes of fields about the immutable constraint in the {@link
* #toPhysicalRowDataType()}, if any, otherwise returns an empty array.
*/
public int[] getImmutableColumnIndexes() {
final List<String> columns =
getColumns().stream()
.filter(Column::isPhysical)
.map(Column::getName)
.collect(Collectors.toList());
return getImmutableColumns()
.map(ImmutableColumnsConstraint::getColumns)
.map(
immutableColumns ->
immutableColumns.stream().mapToInt(columns::indexOf).toArray())
.orElseGet(() -> new int[] {});
}
... (Omit `getter`, `toString`, `equals` and `hashCode`)
}
Proposed Changes
Introduce a new metadata handler, `FlinkRelMdImmutableColumns
`, to provide access to the "immutable columns" constraint information during query optimization.Leverage the immutability information from immutable columns in existing logic that infers relationships between specific keys and upsert keys to enable further optimizations. For example, enhance the logic in
FlinkChangelogModeInferenceProgramto infer sink update-kind traits and upsert materialization strategies.
Compatibility, Deprecation, and Migration Plan
These APIs are newly introduced and do not involve compatibility issues.
Test Plan
Including but not limited to:
- Testing of
ImmutableColumnsConstraintand its metadata handler; - Testing in combination with unique key / upsert key;
- Adding new Delta Join tests to verify the plan and correctness when using a non-PK secondary index together with the immutable columns constraint;
Rejected Alternatives
Inspired by PostgreSQL’s UPDATE triggers and CHECK constraints, this approach uses NEW and OLD to represent column values before and after an update. However, it is difficult for the planner to easily parse this into optimization-relevant metadata information.
[CONSTRAINT no_up_cols] CHECK ON UPDATE (NEW.col1 = OLD.col1, NEW.col2 = OLD.col2 ...) NOT ENFORCED
References
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin
[2] pg constraint https://www.postgresql.org/docs/current/ddl-constraints.html
[3] mysql foreign key reference https://dev.mysql.com/doc/refman/8.4/en/create-table-foreign-keys.html