Important note! |
---|
The KafkaDataContext class in Apache MetaModel is marked with @InterfaceStability.Unstable and the support for Kafka as such is considered experimental. |
With Apache MetaModel 5.1.0 there is a connector for Apache Kafka which allows you to "query" Apache Kafka topics as if they where tables. So although topics in Kafka are not actually table-like structures, each topic is represented in MetaModel as a table. And fields in here include partition
, offset
, key
, value
and timestamp
. When you query a table/topic, MetaModel creates a new temporary consumer of that topic, and returns records from the topic that match the query. Since Kafka can be used in a variety of ways, including scenarios where records are automatically compacted, purged or duplicated, the results of a MetaModel query against Kafka will "just" represent whatever the Kafka consumer gets as per the configuration of the topic.
Usage
Here's a simple example of how to set up an Apache Kafka based DataContext with Apache MetaModel:
DataContext dataContext = new KafkaDataContext<>( String.class, String.class, bootstrapServers, Arrays.asList("Topic1", "Topic2")); DataSet ds = dataContext.query().from("Topic1").select("key", "value").execute(); whle (ds.next()) { Row row = ds.getRow(); System.out.println("key: " + row.getValue(0) + ", value: " + row.getValue(1)); } ds.close();
The following fields are exposed as columns for each topic:
Name | Data type | Description | Special querying remarks |
---|---|---|---|
key | Configurable * | The Kafka record's key | |
value | Configurable * | The Kafka record's value | |
partition | int | The Kafka record's partition number | Queries with WHERE partition = n or WHERE partition IN [...] are optimized |
offset | long | The Kafka record's offset | Queries with WHERE offset > n or WHERE offset >= n are optimized. |
timestamp | long | The Kafka record's timestamp |
* = Configured when constructing the KafkaDataContext object.