Status
Discussion Thread | https://lists.apache.org/thread/d8lhwz1so2l6zsybwntt49xmmkdohdvt |
---|---|
Vote Thread | https://lists.apache.org/thread/hvmxqnyv1c9mpc3hxt5tf8sq30kk53n6 |
JIRA |
Motivation
AWS glue data catalog is persistent metadata store in AWS cloud. As a uniform repository where disparate system can store and find metadata to keep track of data in data silos. It consolidates major data integration capabilities into a single service. With native GlueCatalog feature in Apache Flink will increase more integration support with wide range of diverse sources that glue provide. It will allow users to build Flink applications across different cluster using centralized information shared over single catalog repository with ease.
With wide usage of glue data catalog among users of AWS. There is high demand in community for GlueCatalog. [here].
Proposal
We propose to add a GlueCatalog
implementation which will be persistent catalog provided out-of-box by Flink.
Note:-
Glue catalog will be part of flink-connector-aws
.
Using the Catalog
SQL
CREATE CATALOG glue_catalog (‘type’=’glue’ … ); USE CATALOG glue_catalog;
JAVA
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "glue"; String defaultDatabase = "mydatabase"; GlueCatalog catalog = new GlueCatalog(name, defaultDatabase); tableEnv.registerCatalog("glue", catalog); // set the GlueCatalog as the current catalog of the session tableEnv.useCatalog("glue");
Python
from pyflink.table import * from pyflink.table.catalog import GlueCatalog settings = EnvironmentSettings.in_batch_mode() t_env = TableEnvironment.create(settings) catalog_name = "glue" default_database = "mydatabase" glue_catalog = GlueCatalog(catalog_name, default_database) t_env.register_catalog("glue", glue_catalog) # set the GlueCatalog as the current catalog of the session tableEnv.use_catalog("glue")
Public Interfaces
- GlueCatalog: Publicly visible Implementation of the Interface
Catalog.
- GlueCatalogFactory: Concrete Class that will implement
CatalogFactory
. - GlueCatalogFactoryOptions: This class responsible for configuring the options required to create GlueCatalog
Design
GlueCatalog
will be a read and write catalog that supports the following 5 different categories of operations
Major Category | Functionality |
---|---|
Database |
|
Table |
|
View |
|
Function |
|
Partition |
|
Statistics |
|
Configurations
Following configuration are mandatory required options for the user to create a glue catalog
Option | Description |
---|---|
glue.id | The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue automatically uses the caller's AWS account ID by default. |
glue.account-id | The account ID used in a Glue resource ARN |
glue.endpoint | Configure an alternative endpoint of the Glue service for GlueCatalog to access. |
aws.region | Default AWS region |
aws.credentials.provider | Type of AWS Credential validations |
List of non-mandatory configuration that can be passed for defining for creating Glue Catalog
Option | Description |
---|---|
table.input.format | Input format for the data in Glue Table. |
table.output.format | Output format for the data in Glue Table. |
http-client.type | Client Type for GlueClient |
Flink Glue Metaspace Mapping
AWS Glue supports Namespaces. Namespaces can have multiple databases. User can set the namespace in the catalog option base-namespace. By default, the database will be created under the catalog directly without under any namespace.
Flink Catalog Metaspace Structure | AWS Glue Metaspace Structure |
---|---|
catalog name (defined in Flink only) | n/a |
database name | database name |
table name | table name |
Flink Glue DataType Mapping
Glue Client response provides nested structure to represent the table information. Every table information contains StorageDescriptor
which contains details on each column in table
.
DataType information of column is stored in attribute named type
inside FieldSchema
. For more details on Table structure in AWS Glue checkout Documentation.
Sample response for a Table in AWS Glue
{ "StorageDescriptor": { "cols": { "FieldSchema": [ { "name": "primary-1", "type": "CHAR", "comment": "" }, { "name": "second ", "type": "STRING", "comment": "" } ] }, "location": "s3://aws-logs-111122223333-us-east-1", "inputFormat": "", "outputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", "compressed": "false", "numBuckets": "0", "SerDeInfo": { "name": "", "serializationLib": "org.apache.hadoop.hive.serde2.OpenCSVSerde", "parameters": { "separatorChar": "|" } }, "bucketCols": [], "sortCols": [], "parameters": {}, "SkewedInfo": {}, "storedAsSubDirectories": "false" }, "parameters": { "classification": "csv" } }
There is a need to extract out the datatype information from the glue response to map it in Flink datatype and vice versa
Converting from a Flink type to a type string that can be displayed in Glue. Such conversion will be only used for informational purpose in glue-catalog.
Flink DataType | Glue Datatype Information | Misc Notes |
---|---|---|
CharType | char | |
VarCharType | string | |
BooleanType | boolean | |
BinaryType | fixed[%d] | |
VarBinaryType | binary | |
DecimalType | decimal(<precision>, <scale>) | |
TinyIntType | tinyint | |
SmallIntType | smallint | |
IntType | int | |
BigIntType | bigint | |
FloatType | float | |
DoubleType | double | |
DateType | date | |
TimeType | time | |
TimestampType | timestamp | |
LocalZonedTimestampType | timestamptz | |
ArrayType | array<elementType> | |
MultisetType | map<elementType, int> | |
MapType | map<keyType, valueType> |
High Level Implementation Detail
- GlueClient
GlueClient internally creates a SdkHttpClient which is Synchronous Client talking to the AWS Glue Metastore. There will be two types of SdkHttpClient supported based on the catalog option http-client.type. ApacheHttpClient is the default HTTP client and other is UrlConnectionHttpClient which is the leanest synchronous client. - Flink
CatalogDatabase
Mapping With Glue Database
Glue Database API Documentation
Flink Catalog Database | Glue Database |
---|---|
properties (Map<String, String>) | Parameters (K/V pair) |
comment (String) | description (String) |
LocationUri | |
GlueId | |
For Creating a Database in Glue, We will use CreateDatabaseRequest
provided by glue API .
glueClient.createDatabase(<create_database_request_object>)
Request Response Structure of Create Database in Glue
- Request
- GlueId – Glue id string, not less than 1 or more than 255 bytes long, matching the Single-line string pattern.
The ID of the Data Catalog in which to create the database. If none is provided, the AWS account ID is used by default. DatabaseInput
– Required: A DatabaseInput object.
The metadata for the database.Tags
– A map array of key-value pairs, not more than 50 pairs.
Each key is a UTF-8 string, not less than 1 or more than 128 bytes long.
Each value is a UTF-8 string, not more than 256 bytes long.
The tags you assign to the database.
- Response
No Response parameters.
Flink Glue Database Exception Mapping
Glue Create Database Exception | Flink CreateCatalog Exception |
---|---|
InvalidInputException | CatalogException |
AlreadyExistsException | DatabaseAlreadyExistException |
ResourceNumberLimitExceededException | CatalogException |
InternalServiceException | CatalogException |
OperationTimeoutException | CatalogException |
GlueEncryptionException | CatalogException |
ConcurrentModificationException | CatalogException |
3. Flink CatalogBaseTable
Mapping With Glue Table
Mapping Flink CatalogBaseTable with the Glue table is necessary for translating the Flink table to Glue table and vice versa. Below shows the high level mapping of different attributes from Flink Catalog table with attributes of Glue table.
Flink Catalog Table Fields | Glue Catalog Table Fields | Misc Notes |
---|---|---|
tableSchema | StorageDescriptor | |
partitionKeys | PartitionKeys | |
options | Parameters | |
comment | Description | |
tableName | Name | |
TableType | If table value will be set as Table, if view then value will be set as View | |
GlueId | Field is configurable from table definition as part of option | |
CreatedBy | Field is configurable from table definition as part of option | |
StorageDescriptor Documentation
Flink TableSchema | Glue StorageDescriptor | Misc Notes |
---|---|---|
primaryKey | n/a | Here we will use Parameters from StorageDescriptor with key `primaryKeys` and store list of column names.Column definition in Glue will have special identifier as primaryKey corresponding to the columns which are primaryKey for the table. This information will be used while mapping GlueTable to CatalogTable. |
watermarkSpecs - rowTimeAttribute - watermarkExpressionString - watermarkExpressionOutputType | Here we will use Parameters from StorageDescriptor with key watermarkSpecs | |
columns - name - type | Columns | |
Location | Field is configurable from table definition as part of option. | |
InputFormat | ||
OutputFormat | ||
Compressed | ||
Parameters |
4. Flink CatalogView mapping with Glue View
Glue doesn’t have separate VIEW
definition. It provides the view support using the Glue Table with a special attribute called TableType
in table definition.
Everything defined in above point(3) will hold true for View with special marker of table-Kind as VIEW.
5. Flink CatalogFunction mapping with Glue Function
Flink CatalogFunction | Glue Function | Misc Notes |
---|---|---|
className | ClassName | |
functionLanguage | As of now flink supports 3 functional language. Glue Function object doesn't provide any attribute for FunctionLanguage. FunctionLunguage identifier will be part of Function ClassName using delimiter | |
resourceUris - resourceType - uri | ResourceUris | As flink glue also support 3 resourceType - FILE - JAR - ARCHIVE |
FunctionName | ||
DatabaseName | DatabaseName |
Test Plan
We plan to add the feature GlueCatalog
in flink-connector-aws. It will contain Unit Tests covering all functionality supported in Catalog.
We will add E2E test for the entire glue Catalog in flink-connector-aws-e2e-tests to test functionalities with integration with different connectors.