ID | IEP-127 |
Author | |
Sponsor | |
Created |
|
Status | DRAFT |
Right now database schema information in Ignite 3.0 is stored and treated as regular configuration. All business processes related to schema changes are distributed via internal mechanisms of configuration engine and orchestrated ad-hoc by consumer components like TableManager or IndexManager.
But there is a strong evidence that API and guarantees of configuration engine don't satisfy important requirements that emerge in the field of db schema management.
So we need to stop treating schema as any other configuration and move it to a separate component called Catalog Service.
This component will provide high-level API to modify the schema and will take responsibility for all other necessary processes like synchronizing schema changes between nodes, managing schema change events, notifying all necessary components and providing orchestration between them.
This document introduces a new component named Catalog to provide a single source of truth for components to acquire an actual state of the schema, as well as to unify the way we are working with schema object's descriptors within the system.
As by “Schema Synchronization” design (IEP-98: Schema Synchronization), sql schema should be versioned, every update to the schema must increment the version. The Catalog module will be responsible for maintaining and assigning the version to the catalog (which is a snapshot of objects). Besides, the Catalog module should provide a way to resolve the version of the schema for specified timestamp.
Every modification of catalog is split on two phases: accept and apply. During accept phase, given command is validated, then used to generate a list of update entries to save, and, finally, list is saved to distributed update log. Below is a sequence diagram describing accept phase:
After update entries are saved to the log, it is the job of the update log to propagate updates across the cluster. On every node, update log notifies catalog manager about new update entries, and latter applies them and stores new version of a catalog in a local cache. Below is a sequence diagram describing apply phase:
It's proposed to implement an update log over a metastorage. Update entries of version N should be stored by `catalog.update.{N}` key. Also, the latest known version should be stored by `catalog.version` key. Updates are saved on CAS manner with condition `newVersion == value(catalog.version)`.
During recovery, update entries should be read one by one for all version starting with "earliest available" till version stored by `catalog.version` key, and apply those updates entries once again.
Below is suggested interface for UpdateLog:
/** * Distributed log of catalog updates. */ public interface UpdateLog extends IgniteComponent { /** * Appends the given update to the log if the update with that version does not exist. * * <p>The version of {@link VersionedUpdate} must be greater by 1 of the version of previously * appended update, otherwise current update will be rejected. The versions of already appended updates should be tracked by registering * {@link OnUpdateHandler update handler} (see {@link #registerUpdateHandler(OnUpdateHandler)}). * * @param update An update to append to the log. * @return A {@code true} if update has been successfully appended, {@code false} otherwise if update with the same version already * exists, and operation should be retried with new version. */ CompletableFuture<Boolean> append(VersionedUpdate update); /** * Saves a snapshot entry and drop updates of previous versions from the log, if supported, otherwise do nothing. * * @param snapshotEntry An entry, which represents a result of merging updates of previous versions. * @return A {@code true} if snapshot has been successfully written, {@code false} otherwise if a snapshot with the same or greater * version already exists. */ CompletableFuture<Boolean> saveSnapshot(SnapshotEntry snapshotEntry); /** * Registers a handler to keep track of appended updates. * * @param handler A handler to notify when new update was appended. */ void registerUpdateHandler(OnUpdateHandler handler); /** * Starts the component. * * <p>Log replay is a part of a component start up process, thus the handler must * be registered prior to start is invoked, otherwise exception will be thrown. * * @param componentContext The component lifecycle context. * @return Future that will be completed when the asynchronous part of the start is processed. * @throws IgniteInternalException If no handler has been registered. */ @Override CompletableFuture<Void> startAsync(ComponentContext componentContext) throws IgniteInternalException; /** An interface describing a handler that will receive notification when a new update is added to the log. */ @FunctionalInterface interface OnUpdateHandler { /** * An actual handler that will be invoked when new update is appended to the log. * * @param update A new update. * @param metaStorageUpdateTimestamp Timestamp assigned to the update by the Metastorage. * @param causalityToken Causality token. * @return Handler future. */ CompletableFuture<Void> handle(UpdateLogEvent update, HybridTimestamp metaStorageUpdateTimestamp, long causalityToken); } }
where VersionedUpdate is container with assigned version for collection of UpdateEntry'ies.
Over time, the log may grow to a humongous size. To address this problem, snapshotting should be introduced to Update Log.
When saving snapshot of version N, update entries stored by `catalog.update.{N}` should be overwritten with catalog's snapshot of this version. Every update entries of version lower that version of snapshot should be removed. The earliest available version of catalog is tracked under `catalog.snapshot.version` key.
One of the problem related to compaction, is that different components may refer to a certain version of catalog. Until all components finish their work with particular version, this version must not be truncated.
To understand it better, let's assume possible access patterns of catalog:
Catalog is an internal component, thus API below is for internal use only.
/** * Catalog service provides methods to access schema object's descriptors of exact version and/or last actual version at given timestamp, * which is logical point-in-time. * * <p>Catalog service listens distributed schema update event, stores/restores schema evolution history (schema versions) for time-travelled * queries purposes and for lazy data evolution purposes. */ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEventParameters> { @Nullable Catalog catalog(int catalogVersion); @Nullable CatalogTableDescriptor table(String tableName, long timestamp); @Nullable CatalogTableDescriptor table(int tableId, long timestamp); @Nullable CatalogTableDescriptor table(int tableId, int catalogVersion); Collection<CatalogTableDescriptor> tables(int catalogVersion); // rest of the access methods }
where CatalogTableDescriptor is defined as follow:
public class CatalogTableDescriptor extends CatalogObjectDescriptor { // private properties and constructors /** * Returns an identifier of a schema this table descriptor belongs to. */ public int schemaId() {...} /** * Returns versions of this table descriptor. */ public CatalogTableSchemaVersions schemaVersions() {...} /** * Returns an identifier of a distribution zone this table descriptor belongs to. */ public int zoneId() {...} /** * Returns a identifier of the primary key index. */ public int primaryKeyIndexId() {...} /** * Returns a version of this table descriptor. */ public int tableVersion() {...} /** * Returns a list primary key column names. */ public List<String> primaryKeyColumns() {...} /** * Returns a list colocation key column names. */ public List<String> colocationColumns() {...} /** * Returns a list column descriptors for the table. */ public List<CatalogTableColumnDescriptor> columns() {...} /** Returns a column descriptor for column with given name, {@code null} if absent. */ public @Nullable CatalogTableColumnDescriptor column(String name) {...} /** * Returns an index of a column with the given name, or {@code -1} if such column does not exist. */ public int columnIndex(String name) {...} /** * Returns {@code true} if this the given column is a part of the primary key. */ public boolean isPrimaryKeyColumn(String name) {...} /** * Returns {@code true} if this the given column is a part of collocation key. */ public boolean isColocationColumn(String name) {...} /** * Returns a name of a storage profile. */ public String storageProfile() {...} }
Here is a complete example of creating new table in catalog:
CatalogCommand createTable = CreateTableCommand.builder() .schemaName(SCHEMA_NAME) .tableName(TABLE_NAME) .columns(List.of(columnParams)) .primaryKey(primaryKey(columnParams.name())) .build(); CatalogManager manager = ...; manager.execute(createTable);
where CatalogManager is defined as follow:
/** * The catalog manager provides schema manipulation methods and is responsible for managing distributed operations. */ public interface CatalogManager extends IgniteComponent { /** * Executes given command. * * @param command Command to execute. * @return Future representing result of execution (it will be completed with the created catalog version). */ CompletableFuture<Integer> execute(CatalogCommand command); /** * Executes given list of commands atomically. That is, either all commands will be applied at once * or neither of them. The whole bulk will increment catalog's version by a single point. * * @param commands Commands to execute. * @return Future representing result of execution (it will be completed with the created catalog version). */ CompletableFuture<Integer> execute(List<CatalogCommand> commands); }
, builder for CreateTable command is defined as follow:
/** * Abstract builder of table-related commands. * * <p>Every table-related command, disregard it going to create new table or modify existing one, * should specify name of the table and namespace (schema) where to find existing/put new table. */ public interface AbstractTableCommandBuilder<T extends AbstractTableCommandBuilder<T>> { /** A name of the schema a table belongs to. Should not be null or blank. */ T schemaName(String schemaName); /** A name of the table. Should not be null or blank. */ T tableName(String tableName); /** Sets a flag indicating whether the {@code IF EXISTS} was specified. */ T ifTableExists(boolean ifTableExists); /** Returns a command with specified parameters. */ CatalogCommand build(); } /** * Builder of a command that adds a new table to the catalog. * * <p>A builder is considered to be reusable, thus implementation have * to make sure invocation of {@link #build()} method doesn't cause any * side effects on builder's state or any object created by the same builder. */ public interface CreateTableCommandBuilder extends AbstractTableCommandBuilder<CreateTableCommandBuilder> { /** List of columns a new table should be created with. There must be at least one column. */ CreateTableCommandBuilder columns(List<ColumnParams> columns); /** * Primary key. All columns of a primary key must be present in {@link #columns(List) list of columns}. */ CreateTableCommandBuilder primaryKey(TablePrimaryKey primaryKey); /** * List of colocation columns. Must not be empty, but may be null. All columns, if any, * must be presented in {@link #primaryKey(TablePrimaryKey) primary key}. */ CreateTableCommandBuilder colocationColumns(@Nullable List<String> colocationColumns); /** A name of the zone to create new table in. Should not be blank. */ CreateTableCommandBuilder zone(@Nullable String zoneName); /** A name of the table's storage profile. Table's zone must contain this storage profile. */ CreateTableCommandBuilder storageProfile(@Nullable String storageProfile); }
- IGNITE-19502Getting issue details... STATUS
- IGNITE-20473Getting issue details... STATUS
- IGNITE-21211Getting issue details... STATUS