Kafka Component
Available as of Camel 2.13
The kafka: component is used for communicating with Apache Kafka message broker.
Maven users will need to add the following dependency to their pom.xml
for this component.
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency>
Camel 2.17 or newer
Scala is no longer used, as we use the kafka java client.
Camel 2.16 or older
And then the Scala libraries of choice. camel-kafka does not include that dependency, but assumes it is provided. For example to use Scala 2.10.4 add:
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.4</version> </dependency>
URI format
kafka:server:port[?options]
Options (Camel 2.16 or older)
Property | Default | Description |
---|---|---|
zookeeperHost |
| The zookeeper host to use |
zookeeperPort | 2181 | The zookeeper port to use |
zookeeperConnect | Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used. | |
topic |
| The topic to use |
groupId | ||
partitioner | ||
consumerStreams | 10 | |
clientId | ||
zookeeperSessionTimeoutMs | ||
zookeeperConnectionTimeoutMs | ||
zookeeperSyncTimeMs | ||
consumersCount | 1 | Camel 2.15.0: The number of consumers that connect to kafka server |
batchSize | 100 | Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once, deprecated since 2.17.1, removed |
barrierAwaitTimeoutMs | 10000 | Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs, deprecated since 2.17.1, removed since 2.18.0. |
bridgeEndpoint | false | Camel 2.16.0: If bridgeEndpoint is true, the producer will ignore the topic header setting of the message. |
You can append query options to the URI in the following format, ?option=value&option=value&...
Producer Options (Camel 2.16 or older)
Property | Default | Description |
---|---|---|
producerType | sync (Taken from native KafkaProducer class) | sync - send message/batch immediately, and wait until response is received async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon queueBufferingMaxMs or batchNumMessages |
compressionCodec | ||
compressedTopics | ||
messageSendMaxRetries | ||
retryBackoffMs | ||
topicMetadataRefreshIntervalMs | ||
sendBufferBytes | ||
requestRequiredAcks | ||
requestTimeoutMs | ||
queueBufferingMaxMs | ||
queueBufferingMaxMessages | ||
queueEnqueueTimeoutMs | ||
batchNumMessages | ||
serializerClass | ||
keySerializerClass |
Consumer Options (Camel 2.16 or older)
Property | Default | Description |
---|---|---|
consumerId |
| |
socketTimeoutMs | ||
socketReceiveBufferBytes | ||
fetchMessageMaxBytes | ||
autoCommitEnable | ||
autoCommitIntervalMs | ||
queuedMaxMessages | ||
rebalanceMaxRetries | ||
fetchMinBytes | ||
fetchWaitMaxMs | ||
rebalanceBackoffMs | ||
refreshLeaderBackoffMs | ||
autoOffsetReset | ||
consumerTimeoutMs |
Options (Camel 2.17 or newer)
Property | Default | Description |
---|---|---|
topic | Topic to use. From the consumer side you can specify also a comma separated list of topics. | |
groupId | ||
consumerStreams | 10 | |
clientId | ||
consumersCount | 1 | The number of consumers that connect to kafka server |
batchSize | 100 | Commit Size if auto commit is false |
bridgeEndpoint | false | If the bridgeEndpoint is true, the producer will ignore the topic header setting of the message. |
Producer Options (Camel 2.17 or newer)
Property | Default & Description Reference |
---|---|
serializerClass | http://kafka.apache.org/documentation.html#producerconfigs serializerClass : org.apache.kafka.common.serialization.StringSerializer keySerializerClass : org.apache.kafka.common.serialization.StringSerializer partitioner : org.apache.kafka.clients.producer.internals.DefaultPartitioner
|
keySerializerClass | |
requestRequiredAcks | |
bufferMemorySize | |
compressionCodec | |
retries | |
sslKeyPassword | |
sslKeystoreLocation | |
sslKeystorePassword | |
sslTruststoreLocation | |
sslTruststorePassword | |
producerBatchSize | |
clientId | |
connectionMaxIdleMs | |
lingerMs | |
maxBlockMs | |
maxRequestSize | |
partitioner | |
receiveBufferBytes | |
requestTimeoutMs | |
saslKerberosServiceName | |
saslMechanism (from Camel 2.18) | |
securityProtocol | |
sendBufferBytes | |
sslEnabledProtocols | |
sslKeystoreType | |
sslProtocol | |
sslProvider | |
sslTruststoreType | |
maxInFlightRequest | |
metadataMaxAgeMs | |
metricReporters | |
noOfMetricsSample | |
metricsSampleWindowMs | |
reconnectBackoffMs | |
retryBackoffMs | |
kerberosInitCmd | |
kerberosBeforeReloginMinTime | |
kerberosRenewJitter | |
kerberosRenewWindowFactor | |
sslCipherSuites | |
sslEndpointAlgorithm | |
sslKeymanagerAlgorithm | |
sslTrustmanagerAlgorithm |
Consumer Options (Camel 2.17 or newer)
Property | Default & Description Reference |
---|---|
| http://kafka.apache.org/documentation.html#newconsumerconfigs keyDeserializer : org.apache.kafka.common.serialization.StringDeserializer valueDeserializer : org.apache.kafka.common.serialization.StringDeserializer partitionAssignor : org.apache.kafka.clients.consumer.RangeAssignor
|
keyDeserializer | |
valueDeserializer | |
fetchMinBytes | |
groupId | |
heartbeatIntervalMs | |
maxPartitionFetchBytes | |
sessionTimeoutMs | |
sslKeyPassword | |
sslKeystoreLocation | |
sslKeystorePassword | |
sslTruststoreLocation | |
sslTruststorePassword | |
autoOffsetReset | |
connectionMaxIdleMs | |
autoCommitEnable | |
partitionAssignor | |
receiveBufferBytes | |
consumerRequestTimeoutMs | |
saslKerberosServiceName | |
saslMechanism (from Camel 2.18) | |
securityProtocol | |
sendBufferBytes | |
sslEnabledProtocols | |
sslKeystoreType | |
sslProtocol | |
sslProvider | |
sslTruststoreType | |
autoCommitIntervalMs | |
checkCrcs | |
clientId | |
fetchWaitMaxMs | |
metadataMaxAgeMs | |
metricReporters | |
noOfMetricsSample | |
metricsSampleWindowMs | |
reconnectBackoffMs | |
retryBackoffMs | |
kerberosInitCmd | |
kerberosBeforeReloginMinTime | |
kerberosRenewJitter | |
kerberosRenewWindowFactor | |
sslCipherSuites | |
sslEndpointAlgorithm | |
sslKeymanagerAlgorithm | |
sslTrustmanagerAlgorithm |
Samples
Camel 2.16 or older
Consuming messages:
from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");
Producing messages:
See unit tests of camel-kafka for more examples
Camel 2.17 or newer
Consuming messages:
from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String messageKey = ""; if (exchange.getIn() != null) { Message message = exchange.getIn(); Integer partitionId = (Integer) message .getHeader(KafkaConstants.PARTITION); String topicName = (String) message .getHeader(KafkaConstants.TOPIC); if (message.getHeader(KafkaConstants.KEY) != null) messageKey = (String) message .getHeader(KafkaConstants.KEY); Object data = message.getBody(); System.out.println("topicName :: " + topicName + " partitionId :: " + partitionId + " messageKey :: " + messageKey + " message :: " + data + "\n"); } } }).to("log:input");
Producing messages:
from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class); exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); exchange.getIn().setHeader(KafkaConstants.KEY, "1"); } }).to("kafka:localhost:9092?topic=test");
Using the Kafka idempotent repository (Available from Camel 2.19)
The camel-kafka
library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.
The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions; as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.
Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.
On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of pollDurationMs
in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.
A KafkaIdempotentRepository
has the following properties:
Property | Description |
---|---|
topic | The name of the Kafka topic to use to broadcast changes. (required) |
bootstrapServers | The bootstrap.servers property on the internal Kafka producer and consumer. Use this as shorthand if not setting consumerConfig and producerConfig . If used, this component will apply sensible default configurations for the producer and consumer. |
producerConfig | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides bootstrapServers , so must define the Kafka bootstrap.servers property itself |
consumerConfig | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides bootstrapServers , so must define the Kafka bootstrap.servers property itself |
maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000). |
pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect how far behind other peers in the cluster are, which are updating their caches from the topic, relative to the idempotent consumer instance issued the cache action message. The default value of this is 100 ms. If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. |
The repository can be instantiated by defining the topic and bootstrapServers
, or the producerConfig
and consumerConfig
property sets can be explicitly defined to enable features such as SSL/SASL.
To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext
aware.
Sample usage is as follows:
KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); SimpleRegistry registry = new SimpleRegistry(); registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext CamelContext context = new CamelContext(registry); // later in RouteBuilder... from("direct:performInsert") .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo") // once-only insert into database .end()
In XML:
<!-- simple --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="bootstrapServers" value="localhost:9091"/> </bean> <!-- complex --> <bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository"> <property name="topic" value="idempotent-db-inserts"/> <property name="maxCacheSize" value="10000"/> <property name="consumerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> <property name="producerConfig"> <props> <prop key="bootstrap.servers">localhost:9091</prop> </props> </property> </bean>
Endpoints
Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.
From an Endpoint you can use the following methods
- createProducer() will create a Producer for sending message exchanges to the endpoint
- createConsumer() implements the Event Driven Consumer pattern for consuming message exchanges from the endpoint via a Processor when creating a Consumer
- createPollingConsumer() implements the Polling Consumer pattern for consuming message exchanges from the endpoint via a PollingConsumer