Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Wiki Markup
h2. SQL Component

The *sql:* component allows you to work with databases using JDBC queries. The difference between this component and [JDBC] component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query.

This component uses *{{spring-jdbc}}* behind the scenes for the actual SQL handling.

Maven users will need to add the following dependency to their {{pom.xml}} for this component:
{code:xml}
<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-sql</artifactId>
    <version>x.x.x</version>
    <!-- use the same version as your Camel core version -->
</dependency>
{code}

The SQL component also supports:
- a JDBC based repository for the [Idempotent Consumer] EIP pattern. See further below.
- a JDBC based repository for the [Aggregator|Aggregator2] EIP pattern. See further below.

h3. URI format

{warning}
In Camel 2.10 or older the SQL component can only be used as producer.
From Camel 2.11 onwards this component can also be a consumer, eg {{from()}}.
{warning}

{info}
This component can be used as a [Transactional Client|http://camel.apache.org/transactional-client.html].
{info}

The SQL component uses the following endpoint URI notation:

{code}
sql:select * from table where id=# order by name[?options]
{code}

NoticeFrom thatCamel the2.11 standard {{?}} symbol that denotes the parameters to an SQL query is substituted with the {{#}} symbol, because the {{?}} symbol is used to specify options for the endpoint. The {{?}} symbol replacement can be configured on endpoint basis.

You can append query options to the URI in the following format, {{?option=value&option=value&...}}

h3. Options

{div:class=confluenceTableSmall}
|| Option || Type || Default || Description ||
| {{batch}} | {{boolean}} | {{false}} | *Camel 2.7.5, 2.8.4 and 2.9:* Execute SQL batch update statements. See notes below on how the treatment of the inbound message body changes if this is set to {{true}}. |
| {{dataSourceRef}} | {{String}} | {{null}} | Reference to a {{DataSource}} to look up in the registry. |
| {{placeholder}} | {{String}} | {{#onwards you can use named parameters by using {{#:name}} style as shown:  
{code}
sql:select * from table where id=#:myId order by name[?options]
{code}

When using named parameters, Camel will lookup the names from, in the given precedence:
1. from message body if its a {{java.util.Map}}
2. from message headers

If a named parameter cannot be resolved, then an exception is thrown.

Notice that the standard {{?}} symbol that denotes the parameters to an SQL query is substituted with the {{#}} symbol, because the {{?}} symbol is used to specify options for the endpoint. The {{?}} symbol replacement can be configured on endpoint basis.

You can append query options to the URI in the following format, {{?option=value&option=value&...}}

h3. Options

{div:class=confluenceTableSmall}
|| Option || Type || Default || Description ||
| {{batch}} | {{boolean}} | {{false}} | *Camel 2.7.5, 2.8.4 and 2.9:* SpecifiesExecute aSQL characterbatch thatupdate willstatements. beSee replacednotes to {{?}} in SQL query. Notice, that it is simple {{String.replaceAll()}} operation and no SQL parsing is involved (quoted strings will also change) below on how the treatment of the inbound message body changes if this is set to {{true}}. |
| {{template.<xxx>dataSourceRef}} | {{String}} | {{null}} | Sets additional options on the Spring {{JdbcTemplate}} that is used behind the scenes to execute the queries. For instance, {{template.maxRows=10}}. For detailed documentation, see the [JdbcTemplate javadoc|http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/jdbc/core/JdbcTemplate.html] documentation.Reference to a {{DataSource}} to look up in the registry. |
| {{placeholder}} | {{String}} | {{#}} | *Camel 2.4:* Specifies a character that will be replaced to {{?}} in SQL query. Notice, that it is simple {{String.replaceAll()}} operation and no SQL parsing is involved (quoted strings will also change) |
| {{consumertemplate.delay<xxx>}} | {{long}} | {{500null}} | *Camel 2.11:* *SQL consumer only:* Delay in milliseconds between each poll. |
| {{consumer.initialDelay}} | {{long}} | {{1000}} | *Camel 2.11:* *SQL consumer only:* Milliseconds before polling startsSets additional options on the Spring {{JdbcTemplate}} that is used behind the scenes to execute the queries. For instance, {{template.maxRows=10}}. For detailed documentation, see the [JdbcTemplate javadoc|http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/jdbc/core/JdbcTemplate.html] documentation. |
| {{consumer.useFixedDelaydelay}} | {{booleanlong}} | {{false500}} | *Camel 2.11:* *SQL consumer only:* Set to Delay in milliseconds between each poll. |
| {{trueconsumer.initialDelay}} to use fixed delay between polls, otherwise fixed rate is used. See [ScheduledExecutorService|http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html] in JDK for details| {{long}} | {{1000}} | *Camel 2.11:* *SQL consumer only:* Milliseconds before polling starts. |
| {{maxMessagesPerPollconsumer.useFixedDelay}} | {{intboolean}} | {{0false}} | *Camel 2.11:* *SQL consumer only:* AnSet integer value to define the maximum number of messages to gather per poll. By default, no maximum is set{{true}} to use fixed delay between polls, otherwise fixed rate is used. See [ScheduledExecutorService|http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html] in JDK for details. |
| {{consumer.useIteratormaxMessagesPerPoll}} | {{booleanint}} | {{true0}} | *Camel 2.11:* *SQL consumer only:* If {{true}} each row returned when polling will be processed individually. If {{false}} the entire {{java.util.List}} of data is set as the IN bodyAn integer value to define the maximum number of messages to gather per poll. By default, no maximum is set. |
| {{consumer.routeEmptyResultSetuseIterator}} | {{boolean}} | {{falsetrue}} | *Camel 2.11:* *SQL consumer only:* Whether to route a single empty [Exchange] if there was no data to pollIf {{true}} each row returned when polling will be processed individually. If {{false}} the entire {{java.util.List}} of data is set as the IN body. |
| {{consumer.onConsumerouteEmptyResultSet}} | {{Stringboolean}} | {{nullfalse}} |  *Camel 2.11:* *SQL consumer only:* AfterWhether processingto eachroute rowa thensingle thisempty query[Exchange] canif bethere executedwas tono fordata exampleto delete or mark the row poll. |
| {{consumer.onConsume}} | {{String}} | {{null}} |  *Camel 2.11:* *SQL consumer only:* After processing each row then this query can be executed to for example delete or mark the row as processed. The query can have parameter. |
| {{consumer.onConsumeBatchComplete}} | {{String}} | {{null}} |  *Camel 2.11:* *SQL consumer only:* After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters. |
| {{consumer.expectedUpdateCount}} | {{int}} | {{-1}} | *Camel 2.11:* *SQL consumer only:* If using {{consumer.onConsume}} then this option can be used to set an expected number of rows being updated. Typically you may set this to {{1}} to expect one row to be updated. |
| {{consumer.breakBatchOnConsumeFail}} | {{boolean}} | {{false}} |  *Camel 2.11:* *SQL consumer only:* If using {{consumer.onConsume}} and it fails, then this option controls whether to break out of the batch or continue processing the next row from the batch. |
{div}

h3. Treatment of the message body

The SQL component tries to convert the message body to an object of {{java.util.Iterator}} type and then uses this iterator to fill the query parameters (where each query parameter is represented by a {{#}} symbol (or configured placeholder) in the endpoint URI). If the message body is not an array or collection, the conversion results in an iterator that iterates over only one object, which is the body itself.

For example, if the message body is an instance of {{java.util.List}}, the first item in the list is substituted into the first occurrence of {{#}} in the SQL query, the second item in the list is substituted into the second occurrence of {{#}}, and so on.

If {{batch}} is set to {{true}}, then the interpretation of the inbound message body changes slightly – instead of an iterator of parameters, the component expects an iterator that contains the parameter iterators; the size of the outer iterator determines the batch size.

h3. Result of the query

For {{select}} operations, the result is an instance of {{List<Map<String, Object>>}} type, as returned by the [JdbcTemplate.queryForList()|http://static.springframework.org/spring/docs/2.5.x/api/org/springframework/jdbc/core/JdbcTemplate.html#queryForList(java.lang.String,%20java.lang.Object%91%93)] method. For {{update}} operations, the result is the number of updated rows, returned as an {{Integer}}.

h3. Header values

When performing {{update}} operations, the SQL Component stores the update count in the following message headers:

|| Header || Description ||
| {{CamelSqlUpdateCount}} | The number of rows updated for {{update}} operations, returned as an {{Integer}} object. |
| {{CamelSqlRowCount}} | The number of rows returned for {{select}} operations, returned as an {{Integer}} object. |
| {{CamelSqlQuery}} | *Camel 2.8:* Query to execute. This query takes precedence over the query specified in the endpoint URI. Note that query parameters in the header _are_ represented by a {{?}} instead of a {{#}} symbol |

h3. Configuration
You can now set a reference to a {{DataSource}} in the URI directly:

{code}
sql:select * from table where id=# order by name?dataSourceRef=myDS
{code}

h3. Sample
In the sample below we execute a query and retrieve the result as a {{List}} of rows, where each row is a {{Map<String, Object}} and the key is the column name. 

First, we set up a table to use for our sample. As this is based on an unit test, we do it in java:
{snippet:id=e2|lang=java|url=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java}

The SQL script {{createAndPopulateDatabase.sql}} we execute looks like as described below:
{snippet:id=e1|lang=sql|url=camel/trunk/components/camel-sql/src/test/resources/sql/createAndPopulateDatabase.sql}

Then we configure our route and our {{sql}} component. Notice that we use a {{direct}} endpoint in front of the {{sql}} endpoint. This allows us to send an exchange to the {{direct}} endpoint with the URI, {{direct:simple}}, which is much easier for the client to use than the long {{sql:}} URI. Note that the {{DataSource}} is looked up up in the registry, so we can use standard Spring XML to configure our {{DataSource}}.
{snippet:id=e1|lang=java|url=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java}

And then we fire the message into the {{direct}} endpoint that will route it to our {{sql}} component that queries the database.
{snippet:id=e3|lang=java|url=camel/trunk/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDataSourceRefTest.java}

We could configure the {{DataSource}} in Spring XML as follows:
{code:xml}
 <jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/> 
{code}

h4. Using named parameters
*Available as of Camel 2.11*

In the given route below, we want to get all the projects from the projects table. Notice the SQL query has 2 named parameters, :#lic and :#min.
Camel will then lookup for these parameters from the message body or message headers. Notice in the example above we set two headers with constant value
for the named parameters:
{code}
   from("direct:projects")
     .setHeader("lic", constant("ASF"))
     .setHeader("min", constant(123))
     .to("sql:select * from projects where license = :#lic and id > :#min order by id")
{code}

Though if the message body is a {{java.util.Map}} then the named parameters will be taken from the body.
{code}
   from("direct:projects")
     .to("sql:select * from projects where license = :#lic and id > :#min order by id")
{code}

h3. Using the JDBC based idempotent repository
*Available as of Camel 2.7*: In this section we will use the JDBC based idempotent repository.

{tip:title=Abstract class}
From Camel 2.9 onwards there is an abstract class {{org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository}} you can extend to build custom JDBC idempotent repository.
{tip}

First we have to create the database table which will be used by the idempotent repository. For *Camel 2.7*, we use the following schema:
{code:sql}
CREATE TABLE CAMEL_MESSAGEPROCESSED (
  processorName VARCHAR(255),
  messageId VARCHAR(100)
)
{code}

In *Camel 2.8*, we added the createdAt column:
{code:sql}
CREATE TABLE CAMEL_MESSAGEPROCESSED (
  processorName VARCHAR(255),
  messageId VARCHAR(100),
  createdAt TIMESTAMP
)
{code}

We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.

Second we need to setup a {{javax.sql.DataSource}} in the spring XML file:
{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}

And finally we can create our JDBC idempotent repository in the spring XML file as well:
{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}

h4. Customize the JdbcMessageIdRepository
Starting with *Camel 2.9.1* you have a few options to tune the {{org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository}} for your needs:
|| Parameter || Default Value || Description ||
| createTableIfNotExists | true | Defines whether or not Camel should try to create the table if it doesn't exist. |
| tableExistsString | SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0 | This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist. |
| createString | CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP) | The statement which is used to create the table. |
| queryString | SELECT COUNT\(*\) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? | The query which is used to figure out whether the message already exists in the repository (the result is not equals to '0'). It takes two parameters. This first one is the processor name ({{String}}) and the second one is the message id ({{String}}). |
| insertString | INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?) | The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name ({{String}}), the second one is the message id ({{String}}) and the third one is the timestamp ({{java.sql.Timestamp}}) when this entry was added to the repository. |
| deleteString | DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? | The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name ({{String}}) and the second one is the message id ({{String}}). |

A customized {{org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository}} could look like:
{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml}

h3. Using the JDBC based aggregation repository
*Available as of Camel 2.6*
{info:title=Using JdbcAggregationRepository in Camel 2.6}
In Camel 2.6, the JdbcAggregationRepository is provided in the {{camel-jdbc-aggregator}} component. From Camel 2.7 onwards, the {{JdbcAggregationRepository}} is provided in the {{camel-sql}} component.
{info}
{{JdbcAggregationRepository}} is an {{AggregationRepository}} which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only {{AggregationRepository}}.
The {{JdbcAggregationRepository}} allows together with Camel to provide persistent support for the [Aggregator|Aggregator2].

It has the following options:
|| Option || Type || Description ||
| {{dataSource}} | {{DataSource}} | *Mandatory:* The {{javax.sql.DataSource}} to use for accessing the database. | 
| {{repositoryName}} | {{String}} | *Mandatory:* The name of the repository. |
| {{transactionManager}} | {{TransactionManager}} | *Mandatory:* The {{org.springframework.transaction.PlatformTransactionManager}} to mange transactions for the database. The TransactionManager must be able to support databases. |
| {{lobHandler}} | {{LobHandler}} | A {{org.springframework.jdbc.support.lob.LobHandler}} to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle. |
| {{returnOldExchange}} | boolean | Whether the get operation should return the old existing Exchange if any existed. By default this option is {{false}} to optimize as we do not need the old exchange when aggregating. |
| {{useRecovery}} | boolean | Whether or not recovery is enabled. This option is by default {{true}}. When enabled the Camel [Aggregator|Aggregator2] automatic recover failed aggregated exchange and have them resubmitted. |
| {{recoveryInterval}} | long | If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis. |
| {{maximumRedeliveries}} | int | Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the {{deadLetterUri}} option must also be provided. |
| {{deadLetterUri}} | String | An endpoint uri for a [Dead Letter Channel] where exhausted recovered Exchanges will be moved. If this option is used then the {{maximumRedeliveries}} option must also be provided. |


h4. What is preserved when persisting
{{JdbcAggregationRepository}} will only preserve any {{Serializable}} compatible data types. If a data type is not such a type its dropped and a {{WARN}} is logged. And it only persists the {{Message}} body and the {{Message}} headers. The {{Exchange}} properties are *not* persisted. 

h4. Recovery
The {{JdbcAggregationRepository}} will by default recover any failed [Exchange]. It does this by having a background tasks that scans for failed [Exchange]s in the persistent store. You can use the {{checkInterval}} option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed [Exchange]. Any [Exchange] which was found to be recovered will be restored from the persistent store and resubmitted and send out again. 

The following headers is set when an [Exchange] is being recovered/redelivered:
|| Header || Type || Description ||
| {{Exchange.REDELIVERED}} | Boolean | Is set to true to indicate the [Exchange] is being redelivered. |
| {{Exchange.REDELIVERY_COUNTER}} | Integer | The redelivery attempt, starting from 1. |

Only when an [Exchange] has been successfully processed it will be marked as complete which happens when the {{confirm}} method is invoked on the {{AggregationRepository}}. This means if the same [Exchange] fails again it will be kept retried until it success.

You can use option {{maximumRedeliveries}} to limit the maximum number of redelivery attempts for a given recovered [Exchange]. You must also set the {{deadLetterUri}} option so Camel knows where to send the [Exchange] when the {{maximumRedeliveries}} was hit. 

You can see some examples in the unit tests of camel-sql, for example [this test|https://svn.apache.org/repos/asf/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java].

h4. Database

To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with {{"_COMPLETED"}}. The name must be configured in the Spring bean with the {{RepositoryName}} property. In the following example aggregation will be used.

The table structure definition of both table are identical: in both case a String value is used as key (*id*) whereas a Blob contains the exchange serialized in byte array.
However one difference should be remembered: the *id* field does not have the same content depending on the table.
In the aggregation table *id* holds the correlation Id used by the component to aggregate the messages. In the completed table, *id* holds the id of the exchange stored in corresponding the blob field.

Here is the SQL query used to create the tables, just replace {{"aggregation"}} with your aggregator repository name.
{code}
CREATE TABLE aggregation (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    constraint aggregation_pk PRIMARY KEY (id)
);
CREATE TABLE aggregation_completed (
    id varchar(255) NOT NULL,
    exchange blob NOT NULL,
    constraint aggregation_completed_pk PRIMARY KEY (id)
);
{code}

h4. Codec (Serialization)
Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the {{JdbcCodec}} class. One detail of the code requires your attention: the {{ClassLoadingAwareObjectInputStream}}.

The {{ClassLoadingAwareObjectInputStream}} has been reused from the [Apache ActiveMQ|http://activemq.apache.org/] project. It wraps an {{ObjectInputStream}} and use it with the {{ContextClassLoader}} rather than the {{currentThread}} one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.

h4. Transaction
A Spring {{PlatformTransactionManager}} is required to orchestrate transaction.

h4. Service (Start/Stop)
The {{start}} method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.

h4. Aggregator configuration
Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the {{lobHandler}} property.

Here is the declaration for Oracle:
{code:xml}
    <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler">
        <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
    </bean>

    <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>

    <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
        <property name="transactionManager" ref="transactionManager"/>
        <property name="repositoryName" value="aggregation"/>
        <property name="dataSource" ref="dataSource"/>
        <!-- Only with Oracle, else use default -->
        <property name="lobHandler" ref="lobHandler"/>
    </bean>
{code}

{include:Endpoint See Also}
- [JDBC]