Motivation

In the previous discussion(FLINK-21634 and FLINK-27237), we have already determined the ALTER TABLE syntax. But the current Catalog API lacks a critical part of that integration methods to alter tables in external catalog. For example, the Catalog only exposes 


void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)


now. The main problem of this API is it requires the Catalog, e.g. JDBCCatalog to compare the difference between the new CatalogTable and the original CatalogTable, then generate the SQL to alter the table in the external system. However, the ALTER TABLE already told the Catalog the differences.  


Public Interfaces

The main changes in this FLIP include two parts: 

  • Expose the API class about schema change that is used by programmatic DDL, and Catalog API. 

  • Expose the API that Catalog can process the schema change.

Schema Change

In the FLINK-21634, we have already discussed SQL syntax. We propose to introduceTableChange to represent the changes.


/** Represents the resolved change to the table. */
public interface TableChange {
    
    // --------------------------------------------------------------------------------------------
    // Add Change
    // --------------------------------------------------------------------------------------------

    /** 
     * A table change to add a column. 
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> ADD <column_definition>
     * </pre>
     */
    class AddColumn implements TableChange {

        /** Returns the {@link Column} instance to add. */
        public Column getColumn();

	    /** 
         * Returns the position of the added {@link Column} instance.
         * When the return value is null, it means add the column 
         * at the last. When the return value is FIRST, it means move the
         * added column to the first. When the return value is AFTER,
         * it means add the column after the referred column.
         */
	    public @Nullable ColumnPosition getPosition();
    }

    /** 
     * A table change to add an unique constraint. 
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> ADD PRIMARY KEY (<column_name>...) NOT ENFORCED;
     * </pre>
     */
    class AddUniqueConstraint implements TableChange {

        /** Returns the unique constraint to add. */
        public UniqueConstraint getConstraint();
    }

    /** 
     * A table change to add a watermark.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> ADD WATERMARK FOR <row_time> AS <row_time_expression>
     * </pre>
     */
    class AddWatermark implements TableChange {

        /** Returns the watermark to add. */
        public WatermarkSpec getWatermark();
    }

    // --------------------------------------------------------------------------------------------
    // Modify Change
    // --------------------------------------------------------------------------------------------   
	
	/**
     * A base schema change to modify a column. The modification includes:
     * <ul>
     *     <li>change column data type</>
     *     <li>reorder column position</>
     *     <li>modify column comment</li>
     *     <li>change the computed expression</li>
     *     ...
     * </ul>
	 *
     * Some fine-grained column changes are defined in the {@link ModifyPhysicalColumnDataType}, {@link ModifyColumnComment}, {@link ModifyColumnPosition} 
     * and {@link ModifyPhysicalColumnNullability}.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> MODIFY <column_definition> COMMENT '<column_comment>' <column_position>
     * </pre>    
     */    
	class ModifyColumn implements TableChange {

        /** Returns the original {@link Column} instance. */
      	public Column getOldColumn();

      
        /** Returns the modified {@link Column} instance. */
        public Column getNewColumn();

        /** 
         * Returns the position of the modified {@link Column} instance.
         * When the return value is null, it means modify the column 
         * at the original position. When the return value is FIRST, 
         * it means move the modified column to the first. When the return
         * value is AFTER, it means move the column after the refered column.
         */
        public @Nullable ColumnPosition getNewPosition();
    }

    /** 
     * Modify the column comment.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> MODIFY <column_name> <original_column_type> COMMENT '<new_column_comment>'
     * </pre>  
     */
	class ModifyColumnComment extends ModifyColumn {
    
        /** Get the new comment for the column. */
    	String getNewComment();

	}

    /** 
     * Modify the column position.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE mytable MODIFY <column_name> <original_column_type> <column_position>;
     * </pre>  
     */
	class ModifyColumnPosition extends ModifyColumn {}

    /** 
     * Modify the physical column data type.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> MODIFY <column_name> <new_column_type>;
     * </pre>  
     */
	class ModifyPhysicalColumnType extends ModifyColumn {
    
        /** Get the column type for the new column. */
    	DataType getNewType();

	}

	/** 
     * A table change to modify the column name. 
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> RENAME <old_column_name> TO <new_column_name>
     * </pre>   
     */
    class ModifyColumnName extends ModifyColumn {

        /** Returns the origin column name. */
        public String getOldColumnName();

        /** Returns the new column name after renaming the column name. */
        public String getNewColumnName();
    }
	
	/** 
     * A table change to modify an unique constraint.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> MODIFY PRIMARY KEY (<column_name> ...) NOT ENFORCED
     * </pre>   
     */
    class ModifyUniqueConstraint implements TableChange {

        /** Returns the modified unique constraint. */
        public UniqueConstraint getNewConstraint();
    }

	/** 
     * A table change to modify the watermark.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> MODIFY WATERMARK FOR <row_time_column_name> AS <watermark_expression>
     * </pre>   
     */
    class ModifyWatermark implements TableChange {

        /** Returns the modified watermark. */
        public WatermarkSpec getNewWatermark();
    }

    // --------------------------------------------------------------------------------------------
    // Drop Change
    // --------------------------------------------------------------------------------------------

	/** 
     * A table change to drop the column.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> DROP <column_name>
     * </pre>   
     */
     class DropColumn implements TableChange {

        /** Returns the column name. */
        public String getColumnName();
    }  
	
	/** 
     * A table change to drop the watermark.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> DROP WATERMARK
     * </pre>   
     */
    class DropWatermark implements TableChange {}  

	/** 
     * A table change to drop the constraints.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> DROP PRIMARY KEY
     * </pre>   
     */    
	class DropUniqueConstraint implements TableChange {

        /** Returns the constraint name. */
        public String getConstraintName();
    }

    // --------------------------------------------------------------------------------------------
    // Property change
    // --------------------------------------------------------------------------------------------


	/** 
     * A table change to set the table option.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> SET '<key>' = '<value>';
     * </pre>   
     */    
    class SetOption implements TableChange {

        /** Returns the Option key to set. */
        String getKey();

        /** Returns the Option value to set. */
        String getValue();
    }

	/** 
     * A table change to reset the table option.
     * 
     * It is equal to the following statement:
     * <pre>
     *    ALTER TABLE <table_name> RESET '<key>'
     * </pre>   
     */   
    class ResetOption implements TableChange {

        /** Returns the Option key to reset. */
        String getKey();
    }

    // --------------------------------------------------------------------------------------------

    /** The position of the modified or added column. */
    interface ColumnPosition {

        static ColumnPosition first() {
            return First.INSTANCE;
        }

        static ColumnPosition after(String column) {
            return new After(column);
        }
    }

    /**
     * Column position FIRST means the specified column should be the first column. 
     */
    static final class First implements ColumnPosition {
        private static final First INSTANCE = new First();

        private First() {}

        @Override
        public String toString() {
            return "FIRST";
        }
    }

    /**
     * Column position AFTER means the specified column should be put after the given `column`. 
     */
    static final class After implements ColumnPosition {
        private final String column;

        private After(String column) {
            this.column = column;
        }

        public String column() {
            return column;
        }
    }
}

We place the class in the org.apache.flink.table.catalog .

Catalog

Due to the proposed API class, we also need to give better semantics to the existing Catalog class hierarchy while maintaining backwards compatibility. 

public interface Catalog {

    /** Alter table schema with the table changes. */
    default alterTable(
      		ObjectIdentifier identifier, 
            CatalogBaseTable newTable,
      		List<TableChange> tableChanges,
      		boolean ignoreIfNotExists) {
    	alterTable(identifier, newTable, ignoreIfNotExists)    
    }
  
}


In the FLINK-27237,  we also propose the ALTER PARTITION syntax to modify multiple partitions together. But the current Catalog only modifies one partition at the same time, which can not guarantee the atomic. Therefore, we also propose to add the following API to the Catalog interface.

public interface Catalog {

    /** Create a list of the partitions with the ALTER TABLE ADD PARTITION syntax. */
    default void createPartition(
        	ObjectPath tablePath,
        	List<CatalogPartitionSpec> partitionSpecs,
        	List<CatalogPartition> partitions,
        	boolean ignoreIfExists) 
    		throws TableNotExistException, TableNotPartitionedException,
                    PartitionSpecInvalidException, PartitionAlreadyExistsException,
                    CatalogException {
        for (int i = 0; i < partitionSpecs.size(); i++) {
            createPartition(
                    tablePath, partitionSpecs.get(i), partitions.get(i), ifNotExists);
        }
    }

    /** Drop a list of the partitions with the ALTER TABLE DROP PARTITION syntax. */
    default void dropPartition(
        	ObjectPath tablePath, 
        	List<CatalogPartitionSpec> partitionSpecs, 
        	boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
        for (CatalgoPartitionSpec spec: partitionSpecs) {
            dropPartition(spec);
        }
    }
}

It's up to Catalog to keep the atomic promise. Flink doesn't have any mechanism to promise this. 


Due to the ALTER TABLE PARTITION RENAME  syntax, we also need to introduce the following method.

public interface Catalog {
  
    /** Rename the partition with the ALTER TABLE PARTITION RENAME syntax. */
    default void renamePartition(
        	ObjectPath tablePath, 
        	CatalogPartitionSpec originSpec, 
        	CatalogPartitionSpec newSpec) {         
		throw new UnsupportedOperationException(String.format("The %s doesn't support to rename partition.", Catalog.class.getName()));
    }

}



Implementation Detail

When converting the SqlNode  to Operation, the SqlToOperationConverter tries to validate and resolve the table change. For ALTER TABLE ADD syntax, the converter adds the physical columns, metadata columns, computed columns, watermarks, and constraints into the origin schema in order. Then the converter uses the SchemaResolver to resolve the new schema. After resolving the new schema, the converter extracts the added part from the new schema and builds the TableChanges


When translate the modify column statement to the TableChange, the SqlToOperationConverter will try its best to fine-grained TableChange. Otherwise, it uses the default ModifyColumn to represents column change.


Compatibility, Deprecation, and Migration Plan

We add the default implementation for the introduced method.

Future Work

Add/Drop/Modify composite column

In some cases, users might need to modify the nested column. Spark supports

-- add column
ALTER TABLE prod.db.sample
ADD COLUMN new_column bigint AFTER other_column
-- rename column
ALTER TABLE prod.db.sample RENAME COLUMN location.lat TO latitude
-- drop column
ALTER TABLE prod.db.sample DROP COLUMN point.z
-- alter type
ALTER TABLE prod.db.sample ALTER COLUMN id DROP NOT NULL


Our proposal is able to express the idea by introducing a new class named 

class AddCompositeAttribute implements TableChange {
   /**
    * Similar to Calcite {@link RexFieldAccess} that exposes the path to the reference. For
    * column `nested`.`inner`.`name`, the return value is ["nested", "inner"].
    */   
    public String[] getReferenceExpr();
		
    /** Column to describe the added field.*/
    public Column getFeild();

	/** Returns the position of the {@link Column} to add. */
	public @Nullable ColumnPosition getPosition();
}


Alter View

In some cases, users might need to modify the view. For example, postgres supports

ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name SET DEFAULT expression
ALTER VIEW [ IF EXISTS ] name ALTER [ COLUMN ] column_name DROP DEFAULT
ALTER VIEW [ IF EXISTS ] name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER VIEW [ IF EXISTS ] name RENAME [ COLUMN ] column_name TO new_column_name
ALTER VIEW [ IF EXISTS ] name RENAME TO new_name
ALTER VIEW [ IF EXISTS ] name SET SCHEMA new_schema
ALTER VIEW [ IF EXISTS ] name SET ( view_option_name [= view_option_value] [, ... ] )
ALTER VIEW [ IF EXISTS ] name RESET ( view_option_name [, ... ] )


Our proposal also works for these cases above. The current  Catalog API only requires CatalogBaseTable that is the abstract class for both CatalogTable and CatalogView, TableChanges.  With the special syntax, the users are able to create a new kind of TableChange. 




  • No labels