Kinesis Component

Available as of Camel 2.17

The Kinesis component supports receiving messages from and sending messages to Amazon Kinesis service.

Prerequisites

You must have a valid Amazon Web Services developer account, and be signed up to use Amazon Kinesis. More information are available at AWS Kinesis

URI Format

aws-kinesis://stream-name[?options]

The stream needs to be created prior to it being used.
You can append query options to the URI in the following format, ?options=value&option2=value&...

URI Options

Name

Default Value

Context

Description

amazonKinesisClient

null

Shared

Reference to a com.amazonaws.services.kinesis.AmazonKinesisClient in the Registry.

maxMessagesPerPoll

100

Consumer

Maximum results that will be returned in each poll to the AWS API, Given that the shard iterator is unique to the consumer, changing it shouldn't effect other consumers.

iteratorType

TRIM_HORIZON

Consumer

One of TRIM_HORIZON, LATEST, AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER. See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html for descriptions of these two iterator types.

sequenceNumbernullConsumerThe sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER

Required Kinesis component options

You have to provide the amazonKinesisClient in the Registry with proxies and relevant credentials configured.

Batch Consumer

This component implements the Batch Consumer.

This allows you for instance to know how many messages exists in this batch and for instance let the Aggregator aggregate this number of messages.

Usage

Message headers set by the Kinesis consumer

Header

Type

Description

CamelAwsKinesisSequenceNumber

String

The sequence number of the record. This is represented as a String as it size is not defined by the API. If it is to be used as a numerical type then use

CamelAwsKinesisApproximateArrivalTimestamp

String

The time AWS assigned as the arrival time of the record.

CamelAwsKinesisPartitionKey

String

Identifies which shard in the stream the data record is assigned to.

Message headers set by the Kinesis producer

Header

Type

Description

CamelAwsKinesisSequenceNumber

String

The sequence number of the record. This is represented as a String as it size is not defined by the API. If it is to be used as a numerical type then use

CamelAwsKinesisShardId

String

Indicates where the data was stored.

AmazonKinesis configuration

You will need to create an instance of AmazonKinesisClient and bind it to the registry

ClientConfiguration clientConfiguration = new ClientConfiguration();
clientConfiguration.setProxyHost("http://myProxyHost");
clientConfiguration.setProxyPort(8080);

Region region = Region.getRegion(Regions.fromName(region));
region.createClient(AmazonKinesisClient.class, null, clientConfiguration);
// the 'null' here is the AWSCredentialsProvider which defaults to an instance of DefaultAWSCredentialsProviderChain

registry.bind("kinesisClient", client);

You then have to reference the AmazonKinesisClient in the amazonKinesisClient URI option.

from("aws-kinesis://mykinesisstream?amazonKinesisClient=#kinesisClient")
  .to("log:out?showAll=true");

Providing AWS Credentials

It is recommended that the credentials are obtained by using the DefaultAWSCredentialsProviderChain that is the default when creating a new ClientConfiguration instance, however, a different AWSCredentialsProvider can be specified when calling createClient(...).

Dependencies

Maven users will need to add the following dependency to their pom.xml.

pom.xml
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-aws</artifactId>
    <version>${camel-version}</version>
</dependency>

where ${camel-version} must be replaced by the actual version of Camel (2.17 or higher).

  • No labels