DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
Status
Current state: Released
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Working with semi-structured data has long been a foundational scenario of the Lakehouse. While JSON has traditionally served as the primary storage format for such data, its implementation as serialized strings introduces significant inefficiencies. Extracting individual fields from JSON structures requires full deserialization of the entire payload, incurring unnecessary computational overhead for unused data elements. This limitation becomes particularly impactful when partial data access patterns are common.
Spark and Parquet address this challenge through the introduction of the Variant type – a compact binary representation enabling efficient path-based querying of semi-structured content without full deserialization[1]. It optimizes both storage and computation for semi-structured workloads.
With Paimon's recent implementation of Variant type support[2], Flink should also support the Variant type in Table/SQL API. Native Variant type adoption in Flink's Table/SQL API would enable efficient processing of semi-structured data. This enhancement would allow Flink to further leverage Paimon's storage-layer optimizations, ultimately improving performance and resource utilization for semi-structured data pipelines.
In the future, we can leverage the Variant shredding[3] to allow projection push down and filter push down to further optimize the performance of reading from a Variant type value, although it is not in the scope of this FLIP.
Public Interfaces
Introduce Variant
Introduce
Variantinterface that represents semi-structured data, and aVariantBuilderinterface to build aVariantto the flink-core module.
package org.apache.flink.types.variant;
/** Variant represent a semi-structured data. */
public interface Variant extends Serializable {
/** Returns true if the variant is a primitive typed value, such as INT, DOUBLE, STRING, etc. */
boolean isPrimitive();
/** Returns true if this variant is an Array, false otherwise. */
boolean isArray();
/** Returns true if this variant is an Object, false otherwise. */
boolean isObject();
/** Check If this variant is null. */
boolean isNull();
/** Get the type of variant. */
Type getType();
/**
* Get the scalar value of variant as boolean, if the variant type is {@link Type#BOOLEAN}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#BOOLEAN}.
*/
boolean getBoolean() throws VariantTypeException;
/**
* Get the scalar value of variant as byte, if the variant type is {@link Type#TINYINT}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#TINYINT}.
*/
byte getByte() throws VariantTypeException;
/**
* Get the scalar value of variant as short, if the variant type is {@link Type#SMALLINT}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#SMALLINT}.
*/
short getShort() throws VariantTypeException;
/**
* Get the scalar value of variant as int, if the variant type is {@link Type#INT}.
*
* @throws VariantTypeException if this variant is not a scalar value or is not {@link
* Type#INT}.
*/
int getInt() throws VariantTypeException;
/**
* Get the scalar value of variant as long, if the variant type is {@link Type#BIGINT}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#BIGINT}.
*/
long getLong() throws VariantTypeException;
/**
* Get the scalar value of variant as float, if the variant type is {@link Type#FLOAT}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#FLOAT}.
*/
float getFloat() throws VariantTypeException;
/**
* Get the scalar value of variant as BigDecimal, if the variant type is {@link Type#DECIMAL}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#DECIMAL}.
*/
BigDecimal getDecimal() throws VariantTypeException;
/**
* Get the scalar value of variant as double, if the variant type is {@link Type#DOUBLE}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#DOUBLE}.
*/
double getDouble() throws VariantTypeException;
/**
* Get the scalar value of variant as string, if the variant type is {@link Type#STRING}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#STRING}.
*/
String getString() throws VariantTypeException;
/**
* Get the scalar value of variant as LocalDate, if the variant type is {@link Type#DATE}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#DATE}.
*/
LocalDate getDate() throws VariantTypeException;
/**
* Get the scalar value of variant as LocalDateTime, if the variant type is {@link
* Type#TIMESTAMP}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#TIMESTAMP}.
*/
LocalDateTime getTimestamp() throws VariantTypeException;
/**
* Get the scalar value of variant as Instant, if the variant type is {@link Type#TIMESTAMP}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#TIMESTAMP}.
*/
Instant getInstant() throws VariantTypeException;
/**
* Get the scalar value of variant as byte array, if the variant type is {@link Type#BINARY}.
*
* @throws VariantTypeException If this variant is not a scalar value or is not {@link
* Type#BINARY}.
*/
byte[] getBytes() throws VariantTypeException;
/**
* Get the scalar value of variant.
*
* @throws VariantTypeException If this variant is not a scalar value.
*/
Object get() throws VariantTypeException;
/**
* Get the scalar value of variant.
*
* @throws VariantTypeException If this variant is not a scalar value.
*/
<T> T getAs() throws VariantTypeException;
/**
* Access value of the specified element of an array variant. If index is out of range, null is
* returned.
*
* <p>NOTE: if the element value has been explicitly set as <code>null</code> (which is
* different from removal!), a variant that @{@link Variant#isNull()} returns true will be
* returned, not null.
*
* @throws VariantTypeException If this variant is not an array.
*/
Variant getElement(int index) throws VariantTypeException;
/**
* Access value of the specified field of an object variant. If there is no field with the
* specified name, null is returned.
*
* <p>NOTE: if the property value has been explicitly set as <code>null</code>, a variant
* that @{@link Variant#isNull()} returns true will be returned, not null.
*
* @throws VariantTypeException If this variant is not an object.
*/
Variant getField(String fieldName) throws VariantTypeException;
/** Parses the variant to json. */
String toJson();
/** The type of variant. */
enum Type {
OBJECT,
ARRAY,
NULL,
BOOLEAN,
TINYINT,
SMALLINT,
INT,
BIGINT,
FLOAT,
DOUBLE,
DECIMAL,
STRING,
DATE,
TIMESTAMP,
TIMESTAMP_LTZ,
BINARY
}
}
package org.apache.flink.types.variant;
/** Builder for variants. */
@PublicEvolving
public interface VariantBuilder {
/** Create a variant from a byte. */
Variant of(byte b);
/** Create a variant from a short. */
Variant of(short s);
/** Create a variant from a int. */
Variant of(int i);
/** Create a variant from a long. */
Variant of(long l);
/** Create a variant from a string. */
Variant of(String s);
/** Create a variant from a double. */
Variant of(Double d);
/** Create a variant from a float. */
Variant of(Float f);
/** Create a variant from a byte array. */
Variant of(byte[] bytes);
/** Create a variant from a boolean. */
Variant of(boolean b);
/** Create a variant from a BigDecimal. */
Variant of(BigDecimal bigDecimal);
/** Create a variant from an Instant. */
Variant of(Instant instant);
/** Create a variant from a LocalDate. */
Variant of(LocalDate localDate);
/** Create a variant from a LocalDateTime. */
Variant of(LocalDateTime localDateTime);
/** Create a variant of null. */
Variant ofNull();
/** Get the builder of a variant object. */
VariantObjectBuilder object();
/** Get the builder of a variant object. */
VariantObjectBuilder object(boolean allowDuplicateKeys);
/** Get the builder for a variant array. */
VariantArrayBuilder array();
/** Builder for a variant object. */
interface VariantObjectBuilder {
/** Add a field to the object. */
VariantObjectBuilder add(String key, Variant value);
/** Build the variant object. */
Variant build();
}
/** Builder for a variant array. */
interface VariantArrayBuilder {
/** Add a value to the array. */
VariantArrayBuilder add(Variant value);
/** Build the variant array. */
Variant build();
}
}
package org.apache.flink.types.variant;
/** Exception thrown when a variant value is not of the expected type. */
@PublicEvolving
public class VariantTypeException extends RuntimeException {
private static final long serialVersionUID = 1L;
public VariantTypeException(String message) {
super(message);
}
}
Introduce the
BinaryVariantimplementation ofVariantthat follow the same layout as the Parquet Variant Encoding[1]
/**
* A data structure that represents a semi-structured variant with two binary values: value
* and metadata. The value encodes types and values, but not field names. The metadata currently
* contains a version flag and a list of field names. We can extend/modify the detailed binary
* format given the version flag.
*
* @see <a href="https://github.com/apache/parquet-format/blob/master/VariantEncoding.md">Variant
* Binary Encoding</a> for the detail layout of the data structure.
*/
@PublicEvolving
public final class BinaryVariant implements Variant {
...
}
@PublicEvolving
public class VariantUtils {
/**
* Parse a JSON string into a {@link Variant}.
*
* @param json The input json string.
*/
public static Variant parseJson(String json) throws IOException {
...
}
/**
* Parse a JSON string into a {@link Variant}.
*
* @param json The input json string.
* @param allowDuplicateKeys If set to false, parsing variant from JSON will throw an error if
* there are duplicate keys in the input JSON string. When set to true, the parser will keep
* the last occurrence of all fields with the same key.
*/
public static Variant parseJson(String json, boolean allowDuplicateKeys) throws IOException {
...
}
}
Introduce variant type to both DataStream and Table/SQL API
- Introduce the TypeInfo for DataStream API
/**
* Type information for primitive types (int, long, double, byte, ...), String, Date, Void,
* BigInteger, BigDecimal, and Variant.
*/
@Public
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
...
public static final BasicTypeInfo<Variant> VARIANT_TYPE_INFO =
new BasicTypeInfo<Variant>(
Variant.class, new Class<?>[] {}, VariantSerializer.INSTANCE, null);
}
- Introduce DataTypes for Table/SQL API in flink-table-common module
@PublicEvolving
public final class DataTypes {
...
/**
* Data type of semi-structured data.
*
* <p>The type supports storing any semi-structured data, including ARRAY, MAP, and scalar
* types. VARIANT can only store MAP types with keys of type STRING.
*
* @see VariantType
*/
public static DataType VARIANT() {
return new AtomicDataType(new VariantType());
}
}
package org.apache.flink.table.types.logical;
/**
* Data type of semi-structured data.
*
* <p>The type supports storing any semi-structured data, including ARRAY, MAP, and scalar types.
* VARIANT can only store MAP types with keys of type STRING.
*/
@PublicEvolving
public class VariantType extends LogicalType {
...
}
@PublicEvolving
public enum LogicalTypeRoot {
...
VARIANT(LogicalTypeFamily.EXTENSION);
}
Get variant in RowData
@PublicEvolving
public interface RowData {
...
/** Returns the variant value at the given position. */
Variant getVariant(int pos);
}
Convert between a JSON string and a variant
Introduce built-in functions
PARSE_JSON abdto convert between a JSON string to variantTRY_PARSE_JSON
SQL Function | Table Function | Description |
PARSE_JSON(json_string[, allow_duplicate_keys]) | STRING.parseJson([allow_duplicate_keys]) | Parse a JSON string into a Variant. If the JSON string is invalid, an error will be thrown. To return If |
TRY_PARSE_JSON(json_string[, allow_duplicate_keys]) | STRING.tryParseJson([allow_duplicate_keys]) | Try to parse a JSON string into a Variant if possible. If the JSON string is invalid, return If allowDuplicateKeys is set to false, parsing a variant from JSON will throw an error if there are duplicate keys in the input JSON string. If set to true, the parser will keep the last occurrence of all fields with the same key. |
- Extend the current
JSON_STRINGbuilt-in function to convert a Variant to a Json string.
Introduce built-in functions VARIANT_VALUE and VARIANT_QUERY to extract values from a variant
SQL Function | Table Function | Description |
VARIANT_VALUE(variant, path RETURNING type [ { NULL | ERROR | DEFAULT } ON EMPTY ] [ { NULL | ERROR | DEFAULT } ON ERROR ]) | VARIANT.variantValue(STRING path [, type, onEmpty, defaultOnEmpty, onError, defaultOnError]) | Extracts a scalar as the given type from a variant. This method queries the variant for the given path expression and returns the value if the value at that path is scalar. Non-scalar values cannot be returned. For empty path expressions or errors a behavior can be defined to either return null, raise an error, or return a defined default value instead. When omitted, the default is NULL ON EMPTY or NULL ON ERROR, respectively. The default value may be a literal or an expression. If the default value itself raises an error, it falls through to the error behavior for ON EMPTY, and raises an error for ON ERROR. For path contains special characters such as spaces, you can use ['property'] or ["property"] to select the specified property in a parent object. Be sure to put single or double quotes around the property name. When using VARIANT_VALUE in SQL, the path is a character parameter which is already single quoted, so you have to escape the single quotes around property name, such as JSON_VALUE('{"a b": "true"}', '$.[''a b'']'). |
VARIANT_QUERY(variant, path RETURNING type [ { WITHOUT | WITH CONDITIONAL | WITH UNCONDITIONAL } [ ARRAY ] WRAPPER ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON EMPTY ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON ERROR ]) | VARIANT.variantQuery(path, returnType [, variantQueryWrapper, variantQueryOnEmptyOrError, variantQueryOnEmptyOrError ]) | Extracts values as type from a variant. The wrappingBehavior determines whether the extracted value should be wrapped into an array, and whether to do so unconditionally or only if the value itself isn’t an array already. onEmpty and onError determine the behavior in case the path expression is empty, or in case an error was raised, respectively. By default, in both cases null is returned. Other choices are to use an empty array, an empty object, or to raise an error. |
Introduce Built-in Function to Access field or element in the Variant
SQL Function | Table Function | Description |
| variant ‘[’ INT ‘]’ | VARIANT.at(INT) | If the VARIANT is an ARRAY value, returns a VARIANT whose value is the element at the specified index. Otherwise, this operation returns NULL |
| variant ‘[’ STRING ‘]’ | VARIANT.at(STRING) | If the VARIANT is a MAP value that has an element with this key, a VARIANT holding the associated value is returned. Otherwise, NULL is returned. |
| variant.field | VARIANT.at(field) | If the VARIANT is a MAP value that has an element with this key, a VARIANT holding the associated value is returned. Otherwise, NULL is returned. |
Casting
- A primitive value Variant can be cast to the corresponding type. An exception will be thrown in case of a type error if `CAST` is used, and null will be returned if `TRY_CAST` is used.
- An array variant can be cast to ARRAY<T> type, and each element will be cast to type T. An exception will be thrown in case of a type error if `CAST` is used, and null will be returned if `TRY_CAST` is used.
- An object variant can be cast to ROW or STRUCTURED type. An exception will be thrown in case of a type error if `CAST` is used, and null will be returned if `TRY_CAST` is used.
The following types can be cast to/from Variant: CHAR/VARCHAR/STRING, BINARY/VARBINARY/BYTES, BOOLEAN, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, ARRAY, ROW, STRUCTURED.
String representation of Variant
The string representation of a variant is a Json string.
Support VARIANT type in Parquet format
Support VARIANTtype in parquet format to allow writing to or reading a variant typed column from a parquet file.
Example
Here is a simple example of the streaming lakehouse scenario where we use the MySQL CDC connector to ingest some semi-structured data from MySQL to Paimon, process the semi-structured data with Variant, and write to an OLAP system, Starrocks.
-- Data Ingestion from MySQL to Paimon
CREATE TABLE t1 (
id INTEGER,
v STRING -- a json string
) WITH (
'connector' = 'mysql-cdc',
...
)
CREATE TABLE paimon_catalog.ods.t1 (
id INTEGER,
v VARIANT
) WITH (
'connector' = 'paimon'
...
)
INSERT INTO paimon_catalog.ods.t1 SELECT id, PARSE_JSON(v) FROM t1
;
-- Processing with Varaint and write the Variant as JSON string to Starrocks
CREATE TABLE ads_t1 (
id INTEGER,
v STRING,
v1 INTEGER,
v2 INTEGER,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'starrocks'
...
);
INSERT INTO ads_t1 SELECT
id,
TO_JSON(v),
r.v1,
r.v2
LEFT JOIN paimon_catalog.ods.r
ON VARIANT_VALUE(v, '$.a.key') = r.key
;
Proposed Changes
Implement the
VariantSerializerto serialize and deserialize aBinaryVariantobject.Bump the version of Apache Calcite to 1.39.0, where the new type
VARIANTis introduced[4].Implement the
getVariantmethod for all theRowDataimplementation, includingBinaryRowData.Update Parquet format reader and writer to read and write the
Varaint.
Compatibility, Deprecation, and Migration Plan
This FLIP integrates a new data type in Flink, and it is fully backward compatible.
Test Plan
The change will be covered with unit and integration tests.
Future Work
Support variant shredding[3] to allow projection pushdown and filter pushdown by json field
With shredding, we can extract certain fields into separate columns when writing to a Parquet file. After that, we can apply project pushdown and filter pushdown to that extracted field when reading from a shredding variant.
This feature is not trivial and can be supported incremental after this FLIP. Therefore, we will start another FLIP to discuss how we will support variant shredding based on the work of this FLIP.
Rejected Alternatives
n/a
[1] https://github.com/apache/parquet-format/blob/master/VariantEncoding.md
[2] https://github.com/apache/paimon/issues/4471
[3] https://github.com/apache/parquet-format/blob/master/VariantShredding.md