Status

Current state: Accepted

Discussion threadhere

JIRAhere

Released: <Cassandra Sidecar Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CDC logs generated by Cassandra are difficult to consume because each node produces a potentially incomplete log that overlaps the log generated by several other nodes. Currently there is no off-the-shelf solution people can deploy to efficiently and simply consume the logs.

By creating a simple to operate solution for this we can make CDC more accessible and ease the burden of writing software to leverage CDC that currently have to redundantly implement a lot of the functionality needed to consume CDC logs.

Scope

Using Sidecar to integrate Cassandra with Kafka to stream CDC mutations, while maintaining configured consistency levels and availability.

Audience

Cassandra users, admins, operators.

Goals

The goal of this CEP is to utilize Cassandra’s existing CDC feature to:

  • Provide Kafka integration to stream CDC mutations to Kafka topics.
  • Honor configured consistency levels for the CDC mutations streamed to Kafka.
  • Maintain Cassandra/Sidecar high availability guarantees.
  • Keeping CDC publication isolated from Cassandra writes.
  • Providing an official solution for de-duplicating CDC streams between replicas.

Non-Goals

  • Building CDC implementations for the array of streaming eco-systems: Spark Streaming, Flink etc.
  • Providing a linearizable CDC stream.
  • SSTable imports (bulk writes, imports, or any writes outside of the normal CQL path) will not be supported. Only writes that go through the commit log are supported.
  • Providing before and after states; we target providing only the mutation as it appears in the commit log. This avoids a read-before-write pattern which would have major implications for performance and scalability on the database.

Approach

The foundation of CDC will be code to:

  • Read and deserialize CommitLog segments from a generic InputStream , filter mutations by a token range and buffer the mutations in memory.
  • The core CDC logic for reading many CommitLog segments across one or more replicas, de-duplicating mutations and publishing when sufficient replica copies are received.
  • A CDC State object for storing, serializing and deserializing the state to a binary format.
  • Classes representing Cassandra CDC mutation events and Cassandra features: CdcMessage, range tombstones, individual CQL columns, their type and the mutation value.
  • A pluggable type converter for deserializing raw Cassandra ByteBuffers and converting into an alternative format e.g. Avro - mapping Cassandra data types to the equivalent.
  • And finally a Cdc interface that can be implemented to plug-in the sources for reading the commit log bytes. This will be the publicly-exposed API required to implement CDC.

The core of the CDC execution occurs in micro-batches. To prevent excess resource usage and handle “burst” scenarios a maximum number of commit log segments are read per micro-batch. To prevent “busy-polling” a minimum “tick time” is maintained, if the micro-batch completes faster than the tick time the next task is scheduled to maintain the cadence.

CDC operates as a stateful streaming process and so requires storing a small amount of state to operate, the state consists of two parts:

  1. A Commit log ‘marker’ per Cassandra node. This is the offset position last successfully read in the previous micro-batches. This enables CDC to be restarted and resume from the previous position without replaying mutations that are known to have been successfully published. The state contains the segmentId (64-bit long) of the maximal commit log segment read and the position offset (32-bit int) into the segment, stored per Cassandra node.
  2. The non-cryptographic hash (e.g. MD5) of all CDC mutations that were previously read with insufficient replica copies to satisfy the consistency level. If insufficient replica copies are received in a micro-batch, then the hash is stored so that subsequent micro-batches can publish the event if an additional copy is read later.

The CDC state can be serialized into a compressed binary blob and persisted to any binary store. The persist layer is left to be pluggable to allow the user to choose a datastore of choice. The CDC state is also associated with a token range at the time of execution. The CDC state can be merged with other state objects such that the token range history is preserved. This is important in the event of cluster topology changes (expansions, shrink, host replacements etc) where a CDC shard restarts to a new token range and must resume from more than one CDC state objects.

The CDC state is always immutable, overwritable and the last write wins.

In normal operation the state should not exceed kilobytes in size for a single CDC shard so can be easily persisted in a database or binary store. The state has a configurable limit on the number of entries to prevent growing excessively in failure scenarios. The state is held in memory in the CdcConsumer and only read from the persistent store to recover after a restart, crash or bounce.

We leverage CASSANDRA-17001 and CASSANDRA-17666 to prevent potential writing issues when CDC is enabled.

Apache Cassandra Sidecar Implementation

We also propose implementing a fully-fledged CDC solution in the Apache Cassandra Sidecar project so that users can enable CDC with only configuration.

Building a CDC solution into the Sidecar comes with many benefits:

  • Sidecar is an official part of the existing Cassandra eco-system.
  • Sidecar runs co-located with Cassandra instances and so scales with the cluster size.
  • Sidecar can access the underlying Cassandra database to store CDC configuration and the CDC state in a special table.
  • Running in the Sidecar does not require additional external resources to run.

CDC in the Sidecar operates as follows:

  • Each Sidecar assumes the ownership for a portion of the Cassandra token range, which can default to (but is not limited to) the ranges of the co-located Cassandra node(s).
  • Sidecar streams CDC logs from replicas for the token range, either reading from local disk or through a Sidecar http API that exposes the commit log segment bytes.
  • Sidecar reads the commit logs and deserializes the mutations from all available replicas.
  • Sidecar uses the core CDC module to de-duplicate mutations and check for the specified consistency level (e.g. LOCAL_QUORUM) copies before publishing.
  • Sidecar converts mutations into consumable data format (e.g. Avro). Mapping the Casandra data types to appropriate equivalents.
  • Sidecar publishes the CDC events to transport layer (e.g. Kafka).
  • Sidecar stores the CDC state back to a special table in Cassandra so the state is persisted and replicated.
  • APIs in Sidecar allow us to enable/disable and configure CDC, i.e. setting the transport layer endpoint, topic, and data format.
  • Sidecar can monitor for the cluster topology changes (e.g. expansion, shrinks, host replacements) and restart from a new token range. It can detect down replicas and failover accordingly.

Exact-once delivery is known to be a hard problem to solve in distributed systems; for this reason we make best effort to de-duplicate mutations but there will be some scenarios when mutations are published more than once to downstream consumers. CDC in the Sidecar operates on the principle of ‘at-least-once’ delivery. This can occur for example if there is a timeout or failure when publishing mutations to the transport mechanism (e.g. Apache Kafka) , or if the Sidecar process crashes after publishing mutations but before persisting the CDC state; the Sidecar will resume from the previous CDC state and may re-publish mutations.


CQL Tables

The Sidecar requires two additional CQL tables in Cassandra to store the configuration and CDC State.

CDC Configuration

The following table permits storing arbitrary configuration for CDC or auxiliary services (e.g. service="cdc", service="kafka", service="schema_store").

CREATE TABLE IF NOT EXISTS sidecar_internal.configs
service text,
config map<text, text>,
PRIMARY KEY (service));

CDC State

The CDC State object can be stored in a CQL table like this:

CREATE TABLE IF NOT EXISTS sidecar_internal.cdc_state (
job_id text,
split smallint,
start varint,
end varint,
state blob,
PRIMARY KEY ((job_id, split), start, end)
) WITH CLUSTERING ORDER BY (start ASC, end ASC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '1', 'compaction_window_unit': 'DAYS', 'enabled': 'true'}
AND default_time_to_live = 2592000
AND gc_grace_seconds = 0;

The columns are used for the following purposes:

  • job_id is an arbitrary string to namespace the CDC job state, it can be changed to restart the CDC job from a blank state.
  • split is a short integer used to shard the token range by a fixed value so that the CDC state is sharded across partitions.
  • start and end is the closed-open token range of the CDC state at the time it executed.
  • state is the compressed binary blob of the CDC state.

The schema is specifically designed to permit token range queries so that CDC can resume from arbitrary token ranges, for example after an expansion or shrink.

Given the data is overwrite-only the table properties TimeWindowCompactionStrategy, default_time_to_live and gc_grace_seconds are to used reduce the compaction load on the Cassandra cluster. Together they allow expired data to be dropped efficiently while also reducing the compaction load by only compacting the most recent writes.


Configuration APIs

We introduce a new API to be able to manage the services configurations.
To retrieve a configuration.

GET http://localhost:65505/api/v1/services/<SERVICE_NAME>/config


To create/update a configuration.

PUT http://localhost:65505/api/v1/services/<SERVICE_NAME>/config
{
"config": {
"key": "value"
}
}

To delete a configuration (Disable CDC).

DELETE http://localhost:65505/api/v1/services/<SERVICE_NAME>/config

Sources and Sinks

Conceptually, this CEP proposes two different building blocks to the Cassandra Ecosystem. CDC is a source of data, and the Kafka topic it publishes to is a data sink or consumer. These two concepts can be generalized into a generic framework for Sources (or data providers) and sinks (or data consumers). Being CDC the first implemented source and Kafka the first implemented sink.
Potentially, other Sources will be able to reuse the existing sinks, or new sinks can be added in the future. Examples of Sources that could be implemented in the future are: Metrics, Logs, Events...

CQL configuration

Besides having direct APIs on sidecar to configure CDC and its Kafka connectivity, this CEP proposes to leverage what CEP-38 introduces by expanding on the management syntax that could replace direct APIs configuration.
Creating a Sink:

    CREATE DATA_SINK [sink_name] WITH [uri]

An example of an uri could be:

 kafka://kafkacluster:port?param1=value1&param2=value2...

Dropping a sink:

 DROP DATA_SINK [sink_name]

Listing sinks:

 DESCRIBE DATA_SINKS

Creating a source:

 CREATE DATA_SOURCE cdc ON TABLE [table_name] WITH [sink_name]

Dropping a source:

 DROP DATA_SOURCE cdc ON TABLE [table_name]

Listing data sources:

 DESCRIBE DATA_SOURCES

Data Sources and Sinks need database admin permission.

Client Consumption

While the most common usage may be to publish CDC events to Apache Kafka in Apache Avro binary format, we believe it is necessary to keep the transport mechanism and the data format open and pluggable to permit alternate binary data formats and transport layers. We still plan to provide implementations for the proposed Kafka and Avro interfaces.

Schema Distribution

An additional challenge with CDC is managing schemas and schema changes. Consumers must be able to detect and handle schema changes (new columns, dropped columns etc.) without causing breaking changes in the serialization format. The solution we propose is to allow pluggable integration with a Schema Store, such that any changes to the Cassandra CQL Table will regenerate the binary data schema and publish to the Schema Store Schema Store with a deterministically generated schemaId. The schemaId can be generated using - for example - an MD5 of the CQL Table schema text so that it can be derived deterministically from the table schema and changes when alterations are made.

The schemaId will be sent in the header of every CDC message so that consumers can lookup the matching schema in the Schema Store to deserialize the message.

There are many schema store implementations out there that addresses the schema distribution problem. i.e.: https://docs.confluent.io/platform/current/schema-registry/index.html. We again plan to provide a pluggable interface to allow implementations to any schema store appropriate for the user.

We must assume that the underlying Schema Store implementation may not be highly available and so failures to publish a new schema should not block the main CDC stream. This relies on the schemaId being generated deterministically such that schemas can be re-synced when the Schema Store becomes available.

Large Blob

Kafka has some limits on the size of the message that can be posted to a topic. Cassandra doesn’t enforce size limits by defaults (there are some limits that can be enforced once CEP-42 patch is contributed, but they won’t be enabled by default), and that can pose an issue when trying to stream those mutations through it.
The approach to overcome this issue, if needed, is a Kafka Serializer based approach that can be plugged into the CDC configuration. This [POST THE EXAMPLE PATCH] is an example of how such a serializer would work.

On Repair

In case insufficient copies of a mutation are read, CDC will store an MD5 of the mutation until either sufficient replica copies are read or a configurable time threshold is reached, after which the mutation will be dropped. In this case, those mutations may be lost and not published.

In the event that a mutation is written successfully to less than QUORUM replicas, it may be eventually replicated to other nodes through anti-entropy mechanisms. This can be mitigated against by either using cdc_on_repair_enabled flag or relying on clients to retry aggressively in the event of timeouts or transient failures.

Monitoring

The extended CDC feature should provide relevant metrics in the existing general metric reporting solution.

  • Mutation Publish Time.
  • Mutation events per second.
  • CommitLog read time and bytes per second.
  • Consistency guarantees and dropped mutations.
  • Errors and alerts.

Data Types and Conversion

We are proposing a pluggable solution for mutation serialization, and using AVRO as an implementation example. With that in mind, there must be some kind of conversion between Cassandra Types and Avro Types:


Cassandra TypeAvro Type

ascii

string

bigint

int

blob

bytes

boolean

boolean

counter

Not Supported

date

int

decimal

long

double

long

float

long

frozen

bytes

inet

string

int

int

list

array

map

map

set

array

smallint

int

text

string

time

long

timestamp

long

timeuuid

string

tinyint
int

tuple

map

uuid

string

varchar

string

varint
int

duration

long


We support UDTs as long as they use supported types.

We support both frozen and unfrozen collections. However, unfrozen lists require a database read to be usable because the index information in their mutations is not usable by downstream consumers.

We don’t support counters.

Linearizable Guarantees

We are currently proposing this new feature without linearizable guarantees. We hope in the future that Accord can be leveraged to achieve a linearizable CDC stream.

Limitations

  • User provided timestamp, i.e. USING TIMESTAMP. CDC publisher sorts the batch of CDC mutations based on the timestamp, including the user provided timestamp. When the user provided timestamp is out of order and if the mutations end up in 2 distinct batches, it is possible to send mutations with larger timestamp before the ones with lower timestamp. The timestamp value is sent along with the change data, and it is up to the CDC consumer to do the reconciliation.
  • We set a maximum size for the cdc_raw directory, where Cassandra hardlinks the CDC commit log segments, and delete the oldest segments to keep disk usage below this threshold. This prevents excessive CDC disk space usage and potential cluster impact. If the threshold is too low for the cluster's write throughput, log segments may be deleted before CDC can process them. To avoid this 1) ensure the buffer space is sufficient for the cluster's write throughput and 2) monitor CDC metrics to track how long log segments are retained before deletion is necessary.

Compatibility, Deprecation, and Migration Plan

  • This is an additive feature, so no deprecation plan is needed.
  • Cassandra 4.0 and newer versions are supported.

Test Plan

  • Unit tests
  • Integration tests
    • Mixed version clusters and upgrade using in-JVM dtests

Rejected Alternatives

We considered externalizing CDC to other technologies such Apache Spark Streaming or Apache Flink, but this comes with multiple drawbacks:

  • Users will need to setup an additional cluster and provision extra resources for this purpose.
  • CDC operates as a stateful streaming service and maintaining state is not simple in stateless architectures like Spark Streaming.
  • It ties the feature to that specific technology.
  • The CDC module in the Analytics library can still be re-used for this purpose should others wish to implement in other technologies.


  • No labels