Copycat needs a runtime data API to represent the data it is importing or exporting from Kafka. Connectors use this API to either convert data from a source system into a format for Copycat to store in Kafka or to take data loaded from Kafka by Copycat and convert it for storage in a sink system. The serialization of the data is handled by the framework using this API, so it abstracts away the details of serialization from connector developers and allows a single connector implementation to work with multiple serialization formats.

Here is how data is currently processed on the import path:

Important Note: This image is no longer accurate. The conversion and serialization functionality are combined into a single Converter interface in the final APIs.

The export path is equivalent but in reverse. Note that Converter and Serializer are separated to enable reuse of existing Serializer implementations and because converting directly from the Copycat data API to serialized form, although possibly faster and more efficient, requires significantly more implementation effort than reusing existing serialization libraries.

This document explores what features are needed in this API and what will be needed to be compatible with a variety of serialization libraries and source/sink systems. JSON, Avro, Thrift, and Protocol Buffers are used as examples that should be representative enough to ensure the API can work with a variety of data.

Data Types

There is a tension in selecting the set of types to include in Copycat's API:

  • Try to be a superset of all types available in all serialization libraries and source/sink. This ensures that converting from any serialization format to Copycat will not lose any information. However, conversions from Copycat to a serialized form may lose information as types need to be cast and this results in a large, complex API. Additionally, differences between how different systems represent complex types (e.g. datetime) may be difficult or impossible to unify into a standard representation in Copycat.
  • Use only the common subset of types. This ensures converting from Copycat to the serialization format never loses information. However, converting from any serialization format to Copycat can lose data do to type casts.

Here's an overview of what's supported in a few different serialization formats. Note that JSON is a bit different because it doesn't have a schema, so some field types can be represented but the type is not necessarily known (e.g., the number type covers floats and ints).

 JSONAvroThriftProtocol Buffers
booleanYYYY
byte  Y 
unsigned int16    

signed int16

  Y 
unsigned int32   Y (with encoding variants)
signed int32 YYY (with encoding variants)
unsigned int64   Y (with encoding variants)
signed int64 YYY (with encoding variants)
float Y Y
double YYY
number typeY   
stringYYYY
bytes (variable) YYY
bytes (fixed size) Y  
struct/record/objectYYYY
list/arrayYYYY (via repeated modifier)

set (unordered)

  Y 
map w/ string keys YYY (optional, based on structs)
map w/ primitive keys  YY (optional, based on structs)
enum YYY
union Y Y (oneof)
null type Y  
     
optional modifier Y (via union with null type) Y
repeated modifier   Y
default values YYY
aliases Y  
schema names YYY
namespaces/packages YYY
bare primitive typesYY  

Schemas

Copycat will need some representation of data's schema to be provided with the data in order to reason about it. This is a strict requirement for a lot of connectors that write data to structured data store, where schema changes are potentially expensive (e.g. databases, HDFS for Hive). Additionally, this is a requirement for any serialization format that needs a schema/IDL spec description of the message format to work properly. Unlike the schema formats in many of the serialization libraries, Copycat schemas have a different set of requirements:

  • It must be easy to construct them dynamically. The main way we expect to construct them is dynamically: source connectors generate a schema by reading from the source system (e.g. get table schema for a database), and sink connectors will load them before reading the data from input topics.
  • Copycat schemas do not need to be saved/parsed directly (they are always converted to/from a schema for the serializer being used). There is no need to support loading them from a file since it generally isn't possible to write down the schema for a connector ahead of time.
  • In order to support interaction with schemaless systems, Copycat needs a way to express this format (e.g. lack of schema, or a catch-all schema)
    • Using this schema will hurt compatibility/usefulness of some connectors (e.g. there is very little a JDBC sink can do with unstructured data aside from store it as a blob in a single column). However, it is better to support some workflows that can still handle this data (e.g. something like MongoDB to Kafka to Elasticsearch) than to limit to systems that provide good schemas.

Schema Versions and Projection

When two schemas are compatible, it is possible to convert a message with one schema to a message using the other schema. It is important for some connectors to be able to project between different schemas that may appear in the same set of data. For example, if a topic contains data generated by an application and the application added a field to the message type and ran a rolling upgrade, then there will be a period where there are mixed messaged formats. Adding a field is backwards compatible. When a connector consumes this data, if it is to deliver it to structured data storage (e.g. a database table), it might make the table schema change in the database the first time it encounters the new format. However, it may still encounter messages with the old format. It needs to be able to project these to the new format in order to add them to the database table since it is now only compatible with the new schema. Alternatively, the connector may need to ensure it only projects to the latest schema (or latest schema it has seen). For example, an HDFS connector may need to roll to a new file and write all data in a file with a single schema, which means once it has seen version X+1, it should always project version X messages to X+1.

Note that this means that schemas must have some concept of ordering.

We could handle projection in 3 places:

  1. Deserializer: Make the deserializers implement projection automatically. This adds to the complexity of deserializers and requires them all to implement similar code. On the other hand, this allows reuse of existing code if the deserializer already supports this operation (e.g. Avro, Protobufs), allows for format-specific compatibility rules (see, e.g., Protobufs rules for compatible type changes). This would require adding a flag that toggles between at least a couple of different useful modes:

    1. Always return the writer-schema version. This preserves the original stream of data exactly.
    2. Always project to the latest schema that has been seen for the topic so far. This converts interleaved schema versions (e.g. due to rolling upgrade of distributed service: versions 1, 1, 2, 1, 2, 1, 2, 2) into one that is monotonically increasing (1, 1, 2, 2, 2, 2, 2, 2).
    3. Always project to the latest schema known. If a schema registry is available and Copycat is not running continuously, this can be useful for converting the entire backlog of data to the most recent format.
  2. Copycat Framework: We could implement projection ourselves within Copycat and just provide it as a method used by connectors. This adds to Copycat's complexity and requires clear explanation of compatibility rules in the data API spec. This requires schemas to include some ordering information (to be included by Converter during deserialization).
  3. Connector: If connectors need this functionality, they could implement it themselves. Further, they may do this without using two Copycat schemas, instead just coercing the data to the format of the output system. This is similar to (1) in that many connectors duplicate effort. This requires schemas to include some ordering information (to be included by Converter during deserialization).

Proposal

Data Types

Trying to handle all data types or limiting only to types supported by all serialization libraries both have drawbacks. For primitive types, we can support the following:

  • boolean
  • signed int32
  • signed int64
  • string (UTF-8)
  • bytes (variable length)
  • float
  • double
  • enum - these are encoded as ints, but maintaining the names may be important and for some cases may be the preferred format to use in sink connectors (i.e. write the enum string instead of int value when writing to a system where the schema will be lost).

Limiting to these types keeps things relatively simple while covering the most important types. Where conversions are necessary, they only make the serialization less efficient, they do not lose any data (e.g. byte -> int32 or fixed-size byte[] -> variable-length byte[]).

  • Arrays/lists - include items field containing subschema specifying format of each list element
    • Sets in Thrift can degrade to lists. This loses the uniqueness requirement and imposes ordering.
  • Object - include fields array containing subschemas defining field name -> schema.
    • List is ordered, which can be used to generate unique ids for each entry. However, some serialization libraries need to take care in assigning IDs to remain compatible if reader and writer schemas might differ. (If deserialization always happens using the original schema and projection is handled by the Copycat framework, this ID issue is not a problem since schema resolution can use field names.)
    • boolean optional field indicates that the field may be omitted
    • default field indicates a value to use if one isn't specified
    • Must consider optional and default together to determine some properties (e.g. is the field nullable?)
  • Map - support any primitive -> any type mapping. Specify keys and values fields containing subschemas.
    • If necessary, serialization layer can represent this as a list of tuples. However, it also needs to be able to recover the fact that it was a map so deserialization can convert it to the right type

It is also permitted to add namespace and name fields with any schema.

In order to be easily compatible with a wider variety of serialization libraries and to encourage backward compatibility, the schema for data passed to copycat as a CopycatRecord's key or value or for offsets must always be of type object. In other words, simple primitive types are not supported directly. This need not add overhead to serialization (e.g. Avro serializes records with no header, so you only pay the cost of fields; a single field record is equivalent to the plain primitive type schema).

 

Finally, each schema may contain a version field that can be either an integer or string. Normal integer ordering or lexicographic ordering determines which schema is newer.

 

Notable omissions:

  • Null type - indicated via optional fields
  • Unions - you can get the equivalent with an object containing optional fields
  • Repeated modifier - this is equivalent to lists
  • Aliases - schemas copycat deals with are mostly generated automatically so aliases aren't really useful. They also introduce additional complexity in compatibility.
  • Complex data types like dates/times, geographic coordinates, UUIDs, or other specialized data types that some systems might support. These fall into a couple of different categories:
    • Difficult to address comprehensively in a way that will be broadly compatible and satisfies different requirements (e.g. dates/times with all the complexity of timezones, leap days/seconds, different levels of granularity, etc).
    • Too niche to take on the burden of supporting them generally (e.g., geographic coords). Leaving the representation up to the connector is ok.
    • Only adds semantics to an existing type (e.g., UUIDs are just byte[16]).

 

Schemas

Schemas only have to exist as in-memory data structures, but to give a simple example we'll use a JSON-like representation here:

{
  "namespace": "org.apache.kafka",
  "name": "sample",
  "version": 2,
  "type": "object"
  "fields": [
    { "name": "bool_field", "type": "boolean", "optional": true }, // optional and no default == nullable
    { "name" : "int_field", "type": "int32", "optional": true, "default": 0 }, // optional with default == non-nullable
    { "name" : "int64_field", "type": "int64", "optional": false },
    { "name" : "string_field", "type": "string", "default": "default-string-value" },

    { "name" : "list_field", "type": "list", "items": { "type": "int32" }, "optional": false },
    { "name" : "map_field", "type": "map", "keys": { "type": "int32" }, "values": { "type": "string" }
  ]
}

To make it easy to dynamically construct schemas, we'll want a builder API that looks something like this:

ObjectSchemaBuilder builder = new SchemaBuilder("org.apache.kafka", "sample", version=2).object();
Schema = builder.fields()
  .field("bool_field").boolean().optional()
  .field("int_field").int32.optional().default(0)
  .field("int64_field").int64().required()
  .field("list_field").list().itemBuilder().int32().endItem().required()
  .field("map_field").map().keys().int32().endKeys().values().string().endValues()
  .build();

Java API Types

All the primitive types, Lists, and Maps can use their obvious Java counterparts. All of these types have implicit schemas, with types not represented by schemas (e.g. short) being promoted automatically to the nearest larger type.

enum and object will require specialized implementations. These should function similarly to the DynamicMessage/DynamicMessage.Builder class in Protobufs or GenericRecord/GenericRecordBuilder in Avro. These can share a common interface that provides a Schema getSchema() method.

 

Serializer Implementation Sketches

All implementations of serializer share a common concern: tracking schemas across multiple hops. For example, if we use Copycat to import data from a SQL database into Kafka, then a downstream consumer (whether Copycat or some other system) needs the schema in order to be able to read and understand the structure of this data. This can be accomplished by:

  • Transmitting the schema with every message (high overhead)
  • Centrally register schemas and tag each message with an ID. Readers can look up schemas on demand.
  • Manually specifying a fixed schema for each connector (e.g. allow this to be passed via connector config and make the deserializer interface accept a schema string)

This isn't an issue for many applications because they don't work with dynamic schemas. Normally they compile their applications using a specific schema (or auto-generated code for that schema) because they are only handling that one type of data. They can avoid any central registration service because it is assumed both the reader and writer have a copy of the schema (or at least compatible schemas).

Copycat is different because connectors need to be able to handle data with a variety of schemas. For example, an HDFS sink connector that puts data into Hive tables may require input data to be a flat record (or it may flatten the data into table form), but it should be able to handle any schema with the appropriate structure. This means the schema cannot be available at compile time and must be determined dynamically.

JSON

  • No built-in schema system. Use a simple serialization of Copycat's schema class or the appropriate subset of JSON Schema.
  • Use envelope format to include schemas with each message:

    { 
      "schema": { "type": ...},
      "payload": { ... }
    }
  • Provide a mode to toggle the envelope vs. using only the payload.

    • This a) allows compatibility for loading existing JSON data (which will not have an attached schema) and writing the data to topics when that topic will only be consumed by applications that do not need the schema, and b) avoids the overhead of including the schema with every message.

    • Reading this data in a sink connector requires attaching a schema, but there isn't one. We'll need to be able to define a catch-all schema (or imply that by omitting a schema)

  • Converter does straightforward translation to the JSON library format. For example, a Jackson-based implementation would generate a JsonNode representation.
  • Serializer does trivial conversion to byte[] with JSON library.
  • Note that we could, of course, use a schema registry instead of using this custom format. One reason for this design is to provide a built-in implementation that doesn't require any additional services that aren't included with Kafka.

Avro

  • Use one of the existing Avro (or generic) schema registries/manual schema specification. This is already incorporated into some existing serializers (e.g. Confluent's)
  • Converter translates between Copycat types and Avro types (primitive types, GenericRecord, Map, Collection). This implementation is straightforward because Avro has good built-in support for handling schemas dynamically (GenericRecord).

Thrift

  • Requires schema registry/manual schema specification. Thrift is highly dependent on not changing field IDs between different versions of the same schema. This means it may need to be possible to look up previous versions of schemas to ensure compatibility (if reader/writer schemas might not match).
  • Converter can be a nop. Thrift doesn't have an intermediate format that supports dynamic schemas. (Alternatively, one could build a TBase implementation similar to Avro's GenericRecord)
  • Serializer implementation will need to be custom, but can reuse TProtocol implementations. Thrift doesn't have built-in support for parsing schemas, but third-party libraries have implemented this (e.g., this one from Facebook). By combining these, the serializer should be a straightforward implementation.

Protocol Buffers

  • Requires schema registry/manual schema specification. Protobufs is highly dependent on not changing field IDs between different versions of the same schema. This means it may need to be possible to look up previous versions of schemas to ensure compatibility (if reader/writer schemas might not match).
  • Converter translates between copycat types and Descriptor/DynamicMessage.

 

  • No labels