Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
from("direct:start").process(new Processor() {
					@Override
					public void process(Exchange exchange) throws Exception {
						exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
						exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
						exchange.getIn().setHeader(KafkaConstants.KEY, "1");
					}
				}).to("kafka:localhost:9092?topic=test");

Using the Kafka idempotent repository (Available from Camel 2.19)

The camel-kafka library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.

...

On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of pollDurationMs in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.

To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext aware.

A KafkaIdempotentRepository has the following properties:

...

The repository can be instantiated by defining the topic and bootstrapServers, or the producerConfig and consumerConfig property sets can be explicitly defined to enable features such as SSL/SASL.

To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is CamelContext aware.

Sample usage is as follows:

...

Code Block
languagexml
<!-- simple -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="bootstrapServers" value="localhost:9091"/>
</bean>

<!-- complex -->
<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
  <property name="topic" value="idempotent-db-inserts"/>
  <property name="maxCacheSize" value="10000"/>
  <property name="consumerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
  <property name="producerConfig">
    <props>
      <prop key="bootstrap.servers">localhost:9091</prop>
    </props>
  </property>
</bean>



...

 

Include Page
Endpoint
Endpoint