Kafka Component

Available as of Camel 2.13

The kafka: component is used for communicating with Apache Kafka message broker.

Maven users will need to add the following dependency to their pom.xml for this component.

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-kafka</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>

Camel 2.17 or newer

Scala is no longer used, as we use the kafka java client.

Camel 2.16 or older

And then the Scala libraries of choice. camel-kafka does not include that dependency, but assumes it is provided. For example to use Scala 2.10.4 add:

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.10.4</version>
    </dependency>

URI format

kafka:server:port[?options]

 

Options (Camel 2.16 or older)

Property

Default

Description

zookeeperHost

 

The zookeeper host to use

zookeeperPort

2181

The zookeeper port to use

zookeeperConnect Camel 2.13.3/2.14.1: If in use, then zookeeperHost/zookeeperPort is not used.

topic

 

The topic to use

groupId

  

partitioner

  

consumerStreams

10 

clientId

  

zookeeperSessionTimeoutMs

  

zookeeperConnectionTimeoutMs

  

zookeeperSyncTimeMs

  

consumersCount

1

Camel 2.15.0: The number of consumers that connect to kafka server

batchSize

100

Camel 2.15.0: The batchSize that the BatchingConsumerTask processes once, deprecated since 2.17.1, removed
since 2.18.0

barrierAwaitTimeoutMs

10000

Camel 2.15.0: If the BatchingConsumerTask processes exchange exceed the batchSize, it will wait for barrierAwaitTimeoutMs, deprecated since 2.17.1, removed since 2.18.0.

bridgeEndpointfalseCamel 2.16.0: If bridgeEndpoint is true, the producer will ignore the topic header setting of the message.

You can append query options to the URI in the following format, ?option=value&option=value&...

Producer Options (Camel 2.16 or older)

Property

Default

Description

producerType

sync (Taken from native KafkaProducer class)

sync - send message/batch immediately, and wait until response is received

async - queue the message/batch to send. There is a thread per broker (Kafka node) which polls from this queue upon queueBufferingMaxMs or batchNumMessages

compressionCodec  
compressedTopics  
messageSendMaxRetries  
retryBackoffMs  
topicMetadataRefreshIntervalMs  
sendBufferBytes  
requestRequiredAcks  
requestTimeoutMs  
queueBufferingMaxMs  
queueBufferingMaxMessages  
queueEnqueueTimeoutMs  
batchNumMessages  
serializerClass  
keySerializerClass  

Consumer Options (Camel 2.16 or older)

Property

Default

Description

consumerId

 

 
socketTimeoutMs  
socketReceiveBufferBytes  
fetchMessageMaxBytes  
autoCommitEnable  
autoCommitIntervalMs  
queuedMaxMessages  
rebalanceMaxRetries  
fetchMinBytes  
fetchWaitMaxMs  
rebalanceBackoffMs  
refreshLeaderBackoffMs  
autoOffsetReset  
consumerTimeoutMs  

Options (Camel 2.17 or newer)

PropertyDefaultDescription

topic

 Topic to use. From the consumer side you can specify also a comma separated list of topics.

groupId

  

consumerStreams

10 

clientId

  

consumersCount

1

The number of consumers that connect to kafka server

batchSize

100

Commit Size if auto commit is false

bridgeEndpoint

falseIf the bridgeEndpoint is true, the producer will ignore the topic header setting of the message.

 

Producer Options (Camel 2.17 or newer)

PropertyDefault & Description Reference

serializerClass

http://kafka.apache.org/documentation.html#producerconfigs

serializerClass : org.apache.kafka.common.serialization.StringSerializer

keySerializerClass : org.apache.kafka.common.serialization.StringSerializer

partitioner : org.apache.kafka.clients.producer.internals.DefaultPartitioner 

 

 


keySerializerClass

requestRequiredAcks 

bufferMemorySize 

compressionCodec 

retries 

sslKeyPassword

sslKeystoreLocation

sslKeystorePassword

sslTruststoreLocation

sslTruststorePassword

producerBatchSize 

clientId

connectionMaxIdleMs 

lingerMs 

maxBlockMs 

maxRequestSize 

partitioner 

receiveBufferBytes 

requestTimeoutMs 

saslKerberosServiceName

saslMechanism (from Camel 2.18)

securityProtocol

sendBufferBytes 

sslEnabledProtocols 

sslKeystoreType 

sslProtocol 

sslProvider

sslTruststoreType

maxInFlightRequest 

metadataMaxAgeMs 

metricReporters

noOfMetricsSample 

metricsSampleWindowMs 

reconnectBackoffMs 

retryBackoffMs 

kerberosInitCmd 

kerberosBeforeReloginMinTime 

kerberosRenewJitter 

kerberosRenewWindowFactor 

sslCipherSuites

sslEndpointAlgorithm

sslKeymanagerAlgorithm 

sslTrustmanagerAlgorithm 

Consumer Options (Camel 2.17 or newer)

PropertyDefault & Description Reference

 

http://kafka.apache.org/documentation.html#newconsumerconfigs

keyDeserializer : org.apache.kafka.common.serialization.StringDeserializer

valueDeserializer : org.apache.kafka.common.serialization.StringDeserializer

partitionAssignor : org.apache.kafka.clients.consumer.RangeAssignor

 



keyDeserializer

valueDeserializer

fetchMinBytes 

groupId

heartbeatIntervalMs 

maxPartitionFetchBytes 

sessionTimeoutMs 

sslKeyPassword

sslKeystoreLocation

sslKeystorePassword

sslTruststoreLocation

sslTruststorePassword

autoOffsetReset 

connectionMaxIdleMs 

autoCommitEnable 

partitionAssignor 

receiveBufferBytes 

consumerRequestTimeoutMs 

saslKerberosServiceName

saslMechanism (from Camel 2.18)

securityProtocol

sendBufferBytes 

sslEnabledProtocols 

sslKeystoreType 

sslProtocol 

sslProvider

sslTruststoreType

autoCommitIntervalMs 

checkCrcs 

clientId

fetchWaitMaxMs 

metadataMaxAgeMs 

metricReporters

noOfMetricsSample 

metricsSampleWindowMs 

reconnectBackoffMs 

retryBackoffMs 

kerberosInitCmd 

kerberosBeforeReloginMinTime 

kerberosRenewJitter 

kerberosRenewWindowFactor 

sslCipherSuites

sslEndpointAlgorithm

sslKeymanagerAlgorithm 

sslTrustmanagerAlgorithm 

 

Samples

Camel 2.16 or older

Consuming messages:

from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1").to("log:input");

Producing messages:

See unit tests of camel-kafka for more examples

Camel 2.17 or newer

Consuming messages:

from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
						.process(new Processor() {
							@Override
							public void process(Exchange exchange)
									throws Exception {
								String messageKey = "";
								if (exchange.getIn() != null) {
									Message message = exchange.getIn();
									Integer partitionId = (Integer) message
											.getHeader(KafkaConstants.PARTITION);
									String topicName = (String) message
											.getHeader(KafkaConstants.TOPIC);
									if (message.getHeader(KafkaConstants.KEY) != null)
										messageKey = (String) message
												.getHeader(KafkaConstants.KEY);
									Object data = message.getBody();


									System.out.println("topicName :: "
											+ topicName + " partitionId :: "
											+ partitionId + " messageKey :: "
											+ messageKey + " message :: "
											+ data + "\n");
								}
							}
						}).to("log:input");

 

Producing messages:

from("direct:start").process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {
						exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
						exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
						exchange.getIn().setHeader(KafkaConstants.KEY, "1");
					}
				}).to("kafka:localhost:9092?topic=test");

Using the Kafka idempotent repository (Available from Camel 2.19)

The camel-kafka library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.

The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions; as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.

Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.

On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of pollDurationMs in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.

A KafkaIdempotentRepository has the following properties:

PropertyDescription
topic
The name of the Kafka topic to use to broadcast changes. (required)
bootstrapServersThe bootstrap.servers property on the internal Kafka producer and consumer. Use this as shorthand if not setting consumerConfig and producerConfig. If used, this component will apply sensible default configurations for the producer and consumer.
producerConfigSets the properties that will be used by the Kafka producer that broadcasts changes. Overrides bootstrapServers, so must define the Kafka bootstrap.servers property itself
consumerConfigSets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides bootstrapServers, so must define the Kafka bootstrap.servers property itself
maxCacheSizeHow many of the most recently used keys should be stored in memory (default 1000).
pollDurationMs

The poll duration of the Kafka consumer. The local caches are updated immediately; this value will affect how far behind other peers in the cluster are, which are updating their caches from the topic, relative to the idempotent consumer instance issued the cache action message.

The default value of this is 100 ms. If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers.

The repository can be instantiated by defining the topic and bootstrapServers, or the producerConfig and consumerConfig property sets can be explicitly defined to enable features such as SSL/SASL.

To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext aware.

Sample usage is as follows:

KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");

SimpleRegistry registry = new SimpleRegistry();
registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
CamelContext context = new CamelContext(registry);


// later in RouteBuilder...
from("direct:performInsert")
    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
        // once-only insert into database
    .end()

In XML:

<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>

 

Endpoints

Camel supports the Message Endpoint pattern using the Endpoint interface. Endpoints are usually created by a Component and Endpoints are usually referred to in the DSL via their URIs.

From an Endpoint you can use the following methods

See Also

  • No labels