...
Code Block |
---|
from("direct:start").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class); exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); exchange.getIn().setHeader(KafkaConstants.KEY, "1"); } }).to("kafka:localhost:9092?topic=test"); |
Using the Kafka idempotent repository (Available from Camel 2.19)
The camel-kafka
library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.
...
On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of pollDurationMs
in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.
To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext
aware.
A KafkaIdempotentRepository
has the following properties:
...
The repository can be instantiated by defining the topic and bootstrapServers
, or the producerConfig
and consumerConfig
property sets can be explicitly defined to enable features such as SSL/SASL.
To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext
aware.
Sample usage is as follows:
...
Code Block | ||
---|---|---|
| ||
<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="bootstrapServers" value="localhost:9091"/>
</bean>
<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
<property name="topic" value="idempotent-db-inserts"/>
<property name="maxCacheSize" value="10000"/>
<property name="consumerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
<property name="producerConfig">
<props>
<prop key="bootstrap.servers">localhost:9091</prop>
</props>
</property>
</bean>
|
...
Include Page | ||||
---|---|---|---|---|
|