Route Throttling Example
Available as of Camel 2.1
About
This example shows how to use the new feature RoutePolicy to dynamically at runtime to throttle routes based on metrics gathered by the current number of inflight exchanges.
What it means is that Camel will dynamic throttle the routes based on the flow of messages being processed at runtime.
The example have 3 routes where as two of the routes are input routes, and the last is the processing route
1. route1: from("jms") - input from a JMS queue
2. route2: from("file") - input from a file folder
3. route3: from("seda") - processing of the input messages, this one simulates CPU work by delaying the messages 100 mills.
How it works
When the example runs we have a dependency from the routes as follows:
- route1 -> route3
- route2 -> route3
What the example demonstrates is that Camel is capable of dynamic throttling route1 and route2 based on the total flow of messages. At runtime Camel gathers the current inflight exchanges in a registry which is used for metrics.
So when the flow is going too fast then the RoutePolicy kicks in and suspends
route1 and/or route2. The current inflight exchanges will continue to be processed and when we are below a threshold then the RoutePolicy kicks in again and resumes
route1 and/or route2.
Camel provides the throttling policy in the org.apache.camel.impl.ThrottlingInflightRoutePolicy
.
How to run
The example has 3 maven goals to run the example
mvn compile exec:java -PCamelServer
- starts the Camel Server which contains the 3 routes and where you should check its log output for how it goes.
mvn compile exec:java -PCamelClient
- is a client that sends 10000 JMS messages to the JMS broker which is consumed by route1. The Server must be started beforehand.
mvn compile exec:java -PCamelFileClient
- is a client that creates 5000 files that are consumed by route2. The server may be started beforehand, but its not required.
So at first you start the server. Then at any time you can run a client at will. For example you can run the JMS client and let it run to completion at the server. You can see at the server console logging that it reports the progress. And at sometime it will reach 10000 messages processed. You can then start the client again if you like.
You can also start the other client to create the files which then let the example be a bit more complicated as we have concurrent processing of JMS messages and files at the same time. And where as both of these should be dynamic throttled so we wont go too fast.
How to change
You can check the file src/main/resources/META-INF/spring/camel-server.xml
file where you can see the configuration of the dynamic throttler. By default its configured as:
You can then change this and restart the server to see the changes in effect.
JMX management
At runtime you can manage the ThrottlingInflightRoutePolicy
at runtime as its listed under services in the JConsole.
For example you can change the option maxInflightExchanges
while its running to find a more suitable value.
The screenshot below illustrates it from a JConsole.
See more at using JMX with Camel.
Sample output from running example
When running the example you should see a lot of activity logged to the console.
Below is a snippet when the example runs to an end when the 10000 messages have been processed. What you should notice is that the throttling ensures that the JMS consumer is not taking in more messages than we can process. That means that ++JMS++ and ++SEDA++ are completing at nearly the same time.
[ultMessageListenerContainer-42] +++JMS +++ INFO Received: 9800 messages so far. Last group took: 673 millis which is: 148.588 messages per second. average: 159.734 [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 25 > 20 inflight exchange by suspending consumer. [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9800 messages so far. Last group took: 733 millis which is: 136.426 messages per second. average: 159.789 [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 2 <= 2 inflight exchange by resuming consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 25 > 20 inflight exchange by suspending consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 2 <= 2 inflight exchange by resuming consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 25 > 20 inflight exchange by suspending consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 2 <= 2 inflight exchange by resuming consumer. [ultMessageListenerContainer-55] +++JMS +++ INFO Received: 9900 messages so far. Last group took: 758 millis which is: 131.926 messages per second. average: 159.395 [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 25 > 20 inflight exchange by suspending consumer. [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9900 messages so far. Last group took: 598 millis which is: 167.224 messages per second. average: 159.86 [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 2 <= 2 inflight exchange by resuming consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 24 > 20 inflight exchange by suspending consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 2 <= 2 inflight exchange by resuming consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 25 > 20 inflight exchange by suspending consumer. [currentConsumers=25&size=25000] ThrottlingInflightRoutePolicy WARN Throttling consumer: 2 <= 2 inflight exchange by resuming consumer. [ultMessageListenerContainer-76] +++JMS +++ INFO Received: 10000 messages so far. Last group took: 732 millis which is: 136.612 messages per second. average: 159.129 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 10000 messages so far. Last group took: 732 millis which is: 136.612 messages per second. average: 159.589
However if you run the example without the throttling you will notice that the JMS consumer runs faster than we can process the messages. Towards the end we have more than 2000 messages pending on the internal SEDA queue, when the JMS consumer finishes.
[ultMessageListenerContainer-41] +++JMS +++ INFO Received: 9800 messages so far. Last group took: 304 millis which is: 328.947 messages per second. average: 225.272 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 7800 messages so far. Last group took: 645 millis which is: 155.039 messages per second. average: 178.461 [ultMessageListenerContainer-65] +++JMS +++ INFO Received: 9900 messages so far. Last group took: 543 millis which is: 184.162 messages per second. average: 224.765 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 7900 messages so far. Last group took: 438 millis which is: 228.311 messages per second. average: 178.956 [ultMessageListenerContainer-65] +++JMS +++ INFO Received: 10000 messages so far. Last group took: 395 millis which is: 253.165 messages per second. average: 225.017 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8000 messages so far. Last group took: 408 millis which is: 245.098 messages per second. average: 179.561 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8100 messages so far. Last group took: 410 millis which is: 243.902 messages per second. average: 180.148 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8200 messages so far. Last group took: 405 millis which is: 246.914 messages per second. average: 180.744 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8300 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 181.334 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8400 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 181.913 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8500 messages so far. Last group took: 666 millis which is: 150.15 messages per second. average: 181.461 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8600 messages so far. Last group took: 405 millis which is: 246.914 messages per second. average: 182.022 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8700 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 182.577 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8800 messages so far. Last group took: 407 millis which is: 245.7 messages per second. average: 183.112 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 8900 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 183.649 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9000 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 184.177 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9100 messages so far. Last group took: 405 millis which is: 246.914 messages per second. average: 184.693 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9200 messages so far. Last group took: 405 millis which is: 246.914 messages per second. average: 185.2 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9300 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 185.703 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9400 messages so far. Last group took: 405 millis which is: 246.914 messages per second. average: 186.194 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9500 messages so far. Last group took: 405 millis which is: 246.914 messages per second. average: 186.677 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9600 messages so far. Last group took: 408 millis which is: 245.098 messages per second. average: 187.142 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9700 messages so far. Last group took: 413 millis which is: 242.131 messages per second. average: 187.581 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9800 messages so far. Last group took: 410 millis which is: 243.902 messages per second. average: 188.024 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 9900 messages so far. Last group took: 579 millis which is: 172.712 messages per second. average: 187.856 [currentConsumers=25&size=25000] +++SEDA+++ INFO Received: 10000 messages so far. Last group took: 404 millis which is: 247.525 messages per second. average: 188.31
So by using the ThrottlingInflightRoutePolicy
we can throttle the intake of messages to be on a pair with the speed we can process messages. And since its pluggable you can implement your own custom logic to throttle according to your needs.