Child pages
  • Kestrel

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Wiki Markup
h2. Kestrel Component

The Kestrel component allows messages to be sent to a [Kestrel|https://github.com/robey/kestrel] queue, or messages to be consumed from a Kestrel queue.  This component uses the [spymemcached|http://code.google.com/p/spymemcached/] client for memcached protocol communication with Kestrel servers.

h3. URI format

{code}
kestrel://[addresslist/]queuename[?options]
{code}

Where *queuename* is the name of the queue on Kestrel.  The *addresslist* part of the URI may include one or more {{host:port}} pairs.  For example, to connect to the queue {{foo}} on {{kserver01:22133}}, use:

{code}
kestrel://kserver01:22133/foo
{code}

If the addresslist is omitted, {{localhost:22133}} is assumed, i.e.:

{code}
kestrel://foo
{code}

Likewise, if a port is omitted from a {{host:port}} pair in addresslist, the default port 22133 is assumed, i.e.:

{code}
kestrel://kserver01/foo
{code}

Here is an example of a Kestrel endpoint URI used for producing to a clustered queue:

{code}
kestrel://kserver01:22133,kserver02:22133,kserver03:22133/massive
{code}

Here is an example of a Kestrel endpoint URI used for consuming concurrently from a queue:

{code}
kestrel://kserver03:22133/massive?concurrentConsumers=25&waitTimeMs=500
{code}

h3. Options

You can configure properties on each Kestrel endpoint individually by specifying them in the {{?parameters}} portion of the endpoint URI.  Any {{?parameters}} that are omitted will default to what is configured on the KestrelComponent's base KestrelConfiguration.  The following properties may be set on KestrelConfiguration and/or each individual endpoint:

{div:class=confluenceTableSmall}
|| Option || Default Value || Description ||
| {{concurrentConsumers}} | {{1}} | Specifies the number of concurrent consumer threads. |
| {{waitTimeMs}} | {{100}} | Specifies the {{/t=...}} wait time passed to Kestrel on GET requests. |
{div}

*NOTE:* If *waitTimeMs* is set to zero (or negative), the {{/t=...}} specifier does *not* get passed to the server on GET requests.  When a queue is empty, the GET call returns immediately with no value.  In order to prevent "tight looping" in the polling phase, this component will do a {{Thread.sleep(100)}} whenever nothing is returned from the GET request (only when nothing is returned).  You are *highly encouraged* to configure a positive non-zero value for waitTimeMs.

h3. Configuring the Kestrel component using Spring XML

The simplest form of explicit configuration is as follows:

{code:xml}
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <bean id="kestrel" class="org.apache.camel.component.kestrel.KestrelComponent"/>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

</beans>
{code}

That will enable the Kestrel component with all default settings, i.e. it will use {{localhost:22133}}, 100ms wait time, and a single non-concurrent consumer by default.

To use specific options in the base configuration (which supplies configuration to endpoints whose {{?properties}} are not specified), you can set up a KestrelConfiguration POJO as follows:

{code:xml}
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <bean id="kestrelConfiguration" class="org.apache.camel.component.kestrel.KestrelConfiguration">
    <property name="addresses" value="kestrel01:22133"/>
    <property name="waitTimeMs" value="100"/>
    <property name="concurrentConsumers" value="1"/>
  </bean>

  <bean id="kestrel" class="org.apache.camel.component.kestrel.KestrelComponent">
    <property name="configuration" ref="kestrelConfiguration"/>
  </bean>

  <camelContext xmlns="http://camel.apache.org/schema/spring">
  </camelContext>

</beans>
{code}

h3. Usage Examples

h4. Example 1: Consuming

{code}
from("kestrel://kserver02:22133/massive?concurrentConsumers=10&waitTimeMs=500")
  .bean("myConsumer", "onMessage");
{code}

{code}
public class MyConsumer {
    public void onMessage(String message) {
        ...
    }
}
{code}

h4. Example 2: Producing

{code}
public class MyProducer {
    @EndpointInject(uri = "kestrel://kserver01:22133,kserver02:22133/myqueue")
    ProducerTemplate producerTemplate;

    public void produceSomething() {
        producerTemplate.sendBody("Hello, world.");
    }
}
{code}

h4. Example 3: Spring XML Configuration

{code:xml}
  <camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
      <from uri="kestrel://ks01:22133/sequential?concurrentConsumers=1&waitTimeMs=500"/>
      <to uri="log:something"/>
    </route>
    <route>
      <from uri="direct:start"/>
      <to uri="kestrel://ks02:22133/stuff"/>
    </route>
  </camelContext>
{code}

{code}
public class MyBean {
    public void onMessage(String message) {
        ...
    }
}
{code}

h3. Dependencies

The Kestrel component has the following dependencies:
- {{spymemcached}} 2.5 (or greater)

h4. spymemcached
You *must* have the {{spymemcached}} jar on your classpath.  Here is a snippet you can use in your pom.xml:
{code}
<dependency>
  <groupId>spy</groupId>
  <artifactId>memcached</artifactId>
  <version>2.5</version>
</dependency>
{code}
Alternatively, you can [download the jar|http://code.google.com/p/spymemcached/downloads/list] directly.

{warning:title=Limitations}
*NOTE:* The spymemcached client library does *not* work properly with kestrel when JVM assertions are enabled.  There is a known issue with spymemcached when assertions are enabled and a requested key contains the {{/t=...}} extension (i.e. if you're using the {{waitTimeMs}} option on an endpoint URI, which is highly encouraged).

Fortunately, JVM assertions are *disabled by default*, unless you [explicitly enable them|http://download.oracle.com/javase/1.4.2/docs/guide/lang/assert.html], so this should not present a problem under normal circumstances.

Something to note is that Maven's Surefire test plugin *enables* assertions.  If you're using this component in a Maven test environment, you may need to set {{enableAssertions}} to {{false}}.  Please refer to the [surefire:test reference|http://maven.apache.org/plugins/maven-surefire-plugin/test-mojo.html] for details.
{warning}


{include:Endpoint See Also}