DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
| Important note! |
|---|
The KafkaDataContext class in Apache MetaModel is marked with @InterfaceStability.Unstable and the support for Kafka as such is considered experimental. |
With Apache MetaModel 5.1.0 there is a connector for Apache Kafka which allows you to "query" Apache Kafka topics as if they where tables. So although topics in Kafka are not actually table-like structures, each topic is represented in MetaModel as a table. And fields in here include partition, offset, key, value and timestamp. When you query a table/topic, MetaModel creates a new temporary consumer of that topic, and returns records from the topic that match the query. Since Kafka can be used in a variety of ways, including scenarios where records are automatically compacted, purged or duplicated, the results of a MetaModel query against Kafka will "just" represent whatever the Kafka consumer gets as per the configuration of the topic.
Usage
Here's a simple example of how to set up an Apache Kafka based DataContext with Apache MetaModel:
DataContext dataContext = new KafkaDataContext<>(
String.class, String.class, bootstrapServers,
Arrays.asList("Topic1", "Topic2"));
DataSet ds = dataContext.query().from("Topic1").select("key", "value").execute();
whle (ds.next()) {
Row row = ds.getRow();
System.out.println("key: " + row.getValue(0) + ", value: " + row.getValue(1));
}
ds.close();
The following fields are exposed as columns for each topic:
| Name | Data type | Description | Special querying remarks |
|---|---|---|---|
| key | Configurable * | The Kafka record's key | |
| value | Configurable * | The Kafka record's value | |
| partition | int | The Kafka record's partition number | Queries with WHERE partition = n or WHERE partition IN [...] are optimized |
| offset | long | The Kafka record's offset | Queries with WHERE offset > n or WHERE offset >= n are optimized. |
| timestamp | long | The Kafka record's timestamp |
* = Configured when constructing the KafkaDataContext object.