...
Camel supports implementing the Polling Consumer from the EIP patterns using the PollingConsumer interface which can be created via the Endpoint.createPollingConsumer() method.
So in your In Java code you can do:
Code Block | ||
---|---|---|
| ||
Endpoint endpoint = context.getEndpoint("activemq:my.queue"); PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange exchange = consumer.receive(); |
The ConsumerTemplate
(discussed below) is also available.
...
EventDrivenPollingConsumer Options
The The EventDrivePollingConsumer
(the default implementation) supports the following options:
Div | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||
|
Notice that some Camel Components has their own implementation of PollingConsumer
and therefore do not support the options above.
You can configure these options in endpoints URIs, such as shown below:
Code Block | ||
---|---|---|
| ||
Endpoint endpoint = context.getEndpoint("file:inbox?pollingConsumerQueueSize=50"); PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange exchange = consumer.receive(5000); |
ConsumerTemplate
The ConsumerTemplate
is a template much like Spring's s JmsTemplate
or or JdbcTemplate
supporting the Polling Consumer EIP. With the template you can consume Exchanges from an Endpoint. The template supports the 3 three operations listed above. However, but it also including includes convenient methods for returning the body, etc consumeBody
. The example from above using ConsumerTemplate is
Example:
Code Block |
---|
Exchange exchange = consumerTemplate.receive("activemq:my.queue"); |
...
Code Block |
---|
String body = consumerTemplate.receiveBody("activemq:my.queue", String.class); |
You get hold of a ConsumerTemplate
from the CamelContext
with the createConsumerTemplate
operation:
Code Block |
---|
ConsumerTemplate consumer = context.createConsumerTemplate(); |
...
With the Spring DSL we can declare the consumer in the the CamelContext
with the consumerTemplate
tag, just like the ProducerTemplate
. The example below illustrates this:
Wiki Markup |
---|
{snippet:id=e1|lang=xml|url=camel/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringConsumerTemplateTest-context.xml} |
ConsumerTemplate
in our java class. The code below is part of an unit test but it shows how the consumer and producer can work together.Wiki Markup |
---|
{snippet:id=e1|lang=java|url=camel/components/camel-spring/src/test/java/org/apache/camel/spring/SpringConsumerTemplateTest.java} |
Timer
...
Based Polling Consumer
In this sample we use a Timer to schedule a route to be started every 5th second and invoke our bean bean MyCoolBean
where we implement the business logic for the Polling Consumer. Here we want to consume all messages from a JMS queue, process the message and send them to the next queue.
...
Since this a such a common pattern, polling components can extend the ScheduledPollConsumer base class which makes it simpler to implement this pattern. There is also the Quartz Component which provides scheduled delivery of messages using the Quartz enterprise scheduler.
...
ScheduledPollConsumer Options
The The ScheduledPollConsumer
supports the following options:
Div | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
...
|
Using backoff
to let the Scheduler be Less Aggressive
Available as of Camel 2.12
The scheduled Polling Consumer is by default static by using the same poll frequency whether or not there is messages to pickup or not.
From Camel 2.12 onwards : you can configure the scheduled Polling Consumer to be more dynamic by using backoff
. This allows the scheduler to skip N number of polls when it becomes idle, or there has been X number of errors in a row. See more details in the table above for the backoffXXX
options.
For example to let a FTP consumer backoff back-off if its becoming idle for a while you can do:
Code Block | ||
---|---|---|
| ||
from("ftp://myserver?username=foo&passowrd=secret?delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5") .to("bean:processFile"); |
In this example, the FTP consumer will poll for new FTP files evert every 5th second. But if it has been idle for 5 attempts in a row, then it will
backoff back-off using a multiplier of 6, which means it will now poll every 5 x 6 = 30th second instead. When the consumer eventually pickup a file, then the backoff back-off will reset, and the consumer will go back and poll every 5th second again.
Camel will log at DEBUG
level using org.apache.camel.impl.ScheduledPollConsumer
when backoff back-off is kicking-in.
About error handling and scheduled polling consumers
ScheduledPollConsumer is scheduled based and its run
method is invoked periodically based on schedule settings. But errors can also occur when a poll is being executed. For instance if Camel should poll a file network, and this network resource is not available then a java.io.IOException
could occur. As this error happens before any Exchange has been created and prepared for routing, then the regular Error handling in Camel does not apply. So what does the consumer do then? Well the exception is propagated back to the run
method where its handled. Camel will by default log the exception at WARN
level and then ignore it. At next schedule the error could have been resolved and thus being able to poll the endpoint successfully.
...
Available as of Camel 2.12:
The SPI interface org.apache.camel.spi.ScheduledPollConsumerScheduler
allows to implement a custom scheduler to control when the Polling Consumer runs. The default implementation is based on the JDKs ScheduledExecutorService
with a single thread in the thread pool. There is a CRON based implementation in the Quartz2, and Spring components.
For an example of developing and using a custom scheduler, see the unit test org.apache.camel.component.file.FileConsumerCustomSchedulerTest
from the source code in camel-core
.
...
Error Handling When Using PollingConsumerPollStrategy
org.apache.camel.PollingConsumerPollStrategy
is a pluggable strategy that you can configure on the ScheduledPollConsumer
. The default implementation org.apache.camel.impl.DefaultPollingConsumerPollStrategy
will log the caused exception at WARN
level and then ignore this issue.
...
- begin
void begin(Consumer consumer, Endpoint endpoint)
- begin (Camel 2.3)
boolean begin(Consumer consumer, Endpoint endpoint)
- commit
void commit(Consumer consumer, Endpoint endpoint)
- commit (Camel 2.6)
void commit(Consumer consumer, Endpoint endpoint, int polledMessages)
- rollback
boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception
In Camel 2.3 onwards : the begin method returns a a boolean
which indicates whether or not to skipping polling. So you can implement your custom logic and return false
if you do not want to poll this time.
In Camel 2.6 onwards : the commit method has an additional parameter containing the number of message that was actually polled. For example if there was no messages polled, the value would be zero, and you can react accordingly.
The most interesting is the rollback
as it allows you do handle the caused exception and decide what to do.
For instance if we want to provide a retry feature to a scheduled consumer we can implement the PollingConsumerPollStrategy
method and put the retry logic in the rollback
method. Lets just retry up till 3 times:
Code Block | ||||
---|---|---|---|---|
| ||||
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { if (retryCounter < 3) { // return true to tell Camel that it should retry the poll immediately return true; } // okay we give up do not retry anymore return false; } |
Notice that we are given the Consumer
as a parameter. We could use this to restart the consumer as we can invoke stop and start:
Code Block | ||
---|---|---|
| ||
// error occurred lets restart the consumer, that could maybe resolve the issue consumer.stop(); consumer.start(); |
Notice: If if you implement the begin
operation make sure to avoid throwing exceptions as in such a case the poll
operation is not invoked and Camel will invoke the rollback
directly.
Configuring an Endpoint to
...
Use PollingConsumerPollStrategy
To configure an Endpoint to use a custom PollingConsumerPollStrategy
you use the option pollStrategy
. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll
:
Code Block |
---|
from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox") |
...