Status

Motivation

AWS glue data catalog is persistent metadata store in AWS cloud. As a uniform repository where disparate system can store and find metadata to keep track of data in data silos. It consolidates major data integration capabilities into a single service. With native GlueCatalog feature in Apache Flink will increase more integration support with wide range of diverse sources that glue provide. It will allow users to build Flink applications across different cluster using centralized information shared over single catalog repository with ease.

With wide usage of glue data catalog among users of AWS. There is high demand in community for GlueCatalog. [here].

Proposal

We propose to add a GlueCatalog implementation which will be persistent catalog provided out-of-box by Flink.

Note:- 
Glue catalog will be part of flink-connector-aws

Using the Catalog


SQL

CREATE CATALOG glue_catalog (‘type’=’glue’ … );
USE CATALOG glue_catalog;

JAVA

EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name = "glue";
String defaultDatabase = "mydatabase";


GlueCatalog catalog = new GlueCatalog(name, defaultDatabase);
tableEnv.registerCatalog("glue", catalog);

// set the GlueCatalog as the current catalog of the session
tableEnv.useCatalog("glue");

Python

from pyflink.table import *
from pyflink.table.catalog import GlueCatalog

settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(settings)

catalog_name = "glue"
default_database = "mydatabase"


glue_catalog = GlueCatalog(catalog_name, default_database)
t_env.register_catalog("glue", glue_catalog)

# set the GlueCatalog as the current catalog of the session
tableEnv.use_catalog("glue")

Public Interfaces

  • GlueCatalog: Publicly visible Implementation of the Interface Catalog.
  • GlueCatalogFactory: Concrete Class that will implement CatalogFactory .
  • GlueCatalogFactoryOptions: This class responsible for configuring the options required to create GlueCatalog


Design

GlueCatalog will be a read and write catalog that supports the following 5 different categories of operations


Major CategoryFunctionality
Database 
  • listDatabases 
  • getDatabase 
  • databaseExists 
  • createDatabase 
  • dropDatabase 
  • alterDatabase 
Table
  • getTable 
  • listTables 
  • tableExists 
  • createTable 
  • dropTable 
  • renameTable
  • alterTable 
  • tableExists 
View
  • listViews 
Function
  • listFunctions 
  • getFunction 
  • functionExists 
  • createFunction 
  • alterFunction 
  • dropFunction 
Partition
  • listPartitions 
  • listPartitionsByFilter 
  • getPartition 
  • partitionExists 
  • createPartition 
  • dropPartition 
  • alterPartition 
Statistics
  • getTableStatistics 
  • getTableColumnStatistics 
  • getPartitionStatistics 
  • bulkGetPartitionStatistics 
  • getPartitionColumnStatistics 
  • bulkGetPartitionColumnStatistics 
  • alterTableStatistics 
  • alterTableColumnStatistics 
  • alterPartitionStatistics 
  • alterPartitionColumnStatistics 


Configurations


Following configuration are mandatory required options for the user to create a glue catalog


OptionDescription
glue.idThe ID of the Glue Data Catalog where the tables reside. If none is provided, Glue automatically uses the caller's AWS account ID by default.
glue.account-idThe account ID used in a Glue resource ARN
glue.endpointConfigure an alternative endpoint of the Glue service for GlueCatalog to access.
aws.regionDefault AWS region
aws.credentials.providerType of AWS Credential validations


List of non-mandatory configuration that can be passed for defining for creating Glue Catalog 

OptionDescription
table.input.format Input format for the data in Glue Table.
table.output.format Output format for the data in Glue Table.
http-client.type Client Type for GlueClient


Flink Glue Metaspace Mapping


AWS Glue supports Namespaces. Namespaces can have multiple databases. User can set the namespace in the catalog option base-namespace. By default, the database will be created under the catalog directly without under any namespace.



Flink Catalog Metaspace Structure

AWS Glue Metaspace Structure

catalog name (defined in Flink only)
n/a

database name

database name

table name

table name


Flink Glue DataType Mapping


Glue Client response provides nested structure to represent the table information. Every table information contains StorageDescriptor which contains details on each column in table.
DataType information of column is stored in attribute named type inside FieldSchema. For more details on Table structure in AWS Glue checkout Documentation.

Sample response for a Table in AWS Glue

{
"StorageDescriptor": 
    {
      "cols": {
         "FieldSchema": [
           {
             "name": "primary-1",
             "type": "CHAR",
             "comment": ""
           },
           {
             "name": "second ",
             "type": "STRING",
             "comment": ""
           }
         ]
      },
      "location": "s3://aws-logs-111122223333-us-east-1",
      "inputFormat": "",
      "outputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
      "compressed": "false", 
      "numBuckets": "0",
      "SerDeInfo": {
           "name": "",
           "serializationLib": "org.apache.hadoop.hive.serde2.OpenCSVSerde",
           "parameters": {
               "separatorChar": "|"
            }
      },
      "bucketCols": [],
      "sortCols": [],
      "parameters": {},
      "SkewedInfo": {},
      "storedAsSubDirectories": "false"
    },
    "parameters": {
       "classification": "csv"
    }
}

There is a need to extract out the datatype information from the glue response to map it in Flink datatype and vice versa

Converting from a Flink type to a type string that can be displayed in Glue. Such conversion will be only used for informational purpose in glue-catalog.

Flink DataTypeGlue Datatype InformationMisc Notes
CharTypechar
VarCharTypestring
BooleanTypeboolean
BinaryTypefixed[%d]
VarBinaryTypebinary
DecimalTypedecimal(<precision>, <scale>)
TinyIntTypetinyint
SmallIntTypesmallint
IntTypeint
BigIntTypebigint
FloatTypefloat
DoubleTypedouble
DateTypedate
TimeTypetime
TimestampTypetimestamp
LocalZonedTimestampTypetimestamptz
ArrayTypearray<elementType>
MultisetTypemap<elementType, int>
MapTypemap<keyType, valueType>

High Level Implementation Detail

  1. GlueClient

    GlueClient internally creates a SdkHttpClient which is Synchronous Client talking to the AWS Glue Metastore. There will be two types of SdkHttpClient supported based on the catalog option http-client.type. ApacheHttpClient is the default HTTP client and other is UrlConnectionHttpClient which is the leanest synchronous client.

  2. Flink CatalogDatabase Mapping With Glue Database

    Glue Database API Documentation
Flink Catalog DatabaseGlue Database
properties (Map<String, String>)Parameters (K/V pair)
comment (String)description (String)

LocationUri

GlueId


For Creating a Database in Glue, We will use CreateDatabaseRequest provided by glue API .

glueClient.createDatabase(<create_database_request_object>) 

Request Response Structure of Create Database in Glue

  • Request
  1. GlueId – Glue id string, not less than 1 or more than 255 bytes long, matching the Single-line string pattern.
    The ID of the Data Catalog in which to create the database. If none is provided, the AWS account ID is used by default.

  2. DatabaseInputRequired: A DatabaseInput object.
    The metadata for the database.

  3. Tags – A map array of key-value pairs, not more than 50 pairs.
    Each key is a UTF-8 string, not less than 1 or more than 128 bytes long.
    Each value is a UTF-8 string, not more than 256 bytes long.
    The tags you assign to the database.

  • Response

No Response parameters.



Flink Glue Database Exception Mapping

Glue Create Database ExceptionFlink CreateCatalog Exception
InvalidInputExceptionCatalogException
AlreadyExistsExceptionDatabaseAlreadyExistException
ResourceNumberLimitExceededExceptionCatalogException
InternalServiceExceptionCatalogException
OperationTimeoutExceptionCatalogException
GlueEncryptionExceptionCatalogException
ConcurrentModificationExceptionCatalogException


3. Flink CatalogBaseTable Mapping With Glue Table

Mapping Flink CatalogBaseTable with the Glue table is necessary for translating the Flink table to Glue table and vice versa. Below shows the high level mapping of different attributes from Flink Catalog table with attributes of Glue table.

Glue Table API Documentation

Flink Catalog Table FieldsGlue Catalog Table FieldsMisc Notes
tableSchemaStorageDescriptor
partitionKeysPartitionKeys
optionsParameters
commentDescription
tableNameName

TableTypeIf table value will be set as Table, if view then value will be set as View

GlueIdField is configurable from table definition as part of option

CreatedByField is configurable from table definition as part of option







StorageDescriptor Documentation

Flink TableSchemaGlue StorageDescriptorMisc Notes
primaryKeyn/aHere we will use Parameters from StorageDescriptor with key `primaryKeys` and store list of column names.

Column definition in Glue will have special identifier as primaryKey corresponding to the columns which are primaryKey for the table. This information will be used while mapping GlueTable to CatalogTable.
watermarkSpecs
- rowTimeAttribute
- watermarkExpressionString
- watermarkExpressionOutputType

Here we will use Parameters from StorageDescriptor with key watermarkSpecs
columns
- name
- type
Columns
- Name 
- Type 
- Comment 
- Parameters 


LocationField is configurable from table definition as part of option.

InputFormat

OutputFormat

Compressed

Parameters


4. Flink CatalogView mapping with Glue View

Glue doesn’t have separate VIEW definition. It provides the view support using the Glue Table with a special attribute called TableType in table definition.

Everything defined in above point(3) will hold true for View with special marker of table-Kind as VIEW.


5. Flink CatalogFunction mapping with Glue Function


Glue Function Documentation

Flink CatalogFunctionGlue FunctionMisc Notes
classNameClassName
functionLanguage

As of now flink supports 3 functional language.
- JAVA
- SCALA
- Python

Glue Function object doesn't provide any attribute for FunctionLanguage.

FunctionLunguage identifier will be part of Function ClassName using delimiter :

resourceUris
- resourceType
- uri
ResourceUris
- ResourceType
- Uri
As flink glue also support 3 resourceType
- FILE
- JAR
- ARCHIVE

FunctionName
DatabaseNameDatabaseName

Test Plan

We plan to add the feature GlueCatalog  in flink-connector-aws. It will contain Unit Tests covering all functionality supported in Catalog.
We will add E2E test for the entire glue Catalog in flink-connector-aws-e2e-tests to test functionalities with integration with different connectors.