ActiveMQ supports the Stomp protocol and the Stomp - JMS mapping. This makes it easy to write a client in pure Ruby, Perl, Python or PHP for working with ActiveMQ.

Please see the Stomp site for more details

Enabling the ActiveMQ Broker for Stomp

Its very easy to enable ActiveMQ for Stomp. Just add a connector to the broker using the stomp URL.

<transportConnectors>
   <transportConnector name="stomp" uri="stomp://localhost:61613"/>
</transportConnectors>

To see a full example, try this XML. If you save that XML as foo.xml then you can run stomp via the command line as

activemq xbean:foo.xml

For more help see Run Broker.

Security

Stomp implementation fully supports an ActiveMQ security mechanism. This means that the CONNECT command will return an ERROR frame on unsuccessful authentication. Also, the authorization policies will be applied when you try to access (read/write) certain destinations. If you use synchronous operations (by using receipts), you can expect an ERROR frame in case of unauthorized access attempt. In other case, operations will be discarded but the client will not be informed of errors. This also stands for all other errors that can happen on the broker side.

Availability

Stomp security implementation is available from version 5.1 onwards.

SSL

For additional security, you can use Stomp over SSL as described in the following section.

Enabling Stomp over NIO

For better scalability (and performance) you might want to run Stomp protocol over NIO transport. To do that just use stomp+nio transport prefix instead of stomp. For example, add the following transport configuration in your XML file

     <transportConnector name="stomp+nio" uri="stomp+nio://localhost:61612"/>

This transport use NIO transport underneath and will generally use much less threads than standard connector. This connector can help if you want to use large number of queues

Availability

Stomp Nio connector implementation is available in version 5.3 and up.

Enabling Stomp over SSL

It's easy to configure ActiveMQ to use Stomp over SSL connection. All you have to do is use stomp+ssl transport prefix instead of stomp. For example, add the following transport configuration in your XML file

     <transportConnector name="stomp+ssl" uri="stomp+ssl://localhost:61612"/>

Working with Destinations with Stomp

Note that the prefix in stomp /queue/ or /topic/ is removed from the string before passing it to ActiveMQ as a JMS destination. Also note that the default separator in MOM systems is . (DOT). So FOO.BAR is the normal syntax of a MOM queue - the Stomp equivalent would be /queue/FOO.BAR

Be careful about starting destinations with /

If in Stomp world you use /queue/foo/bar then in a JMS world the queue would be called foo/bar not /foo/bar.

Persistence of Stomp messages

By default, Stomp produced messages are set to non-persistent. You have to explicitly tell your Stomp library to add "persistent:true" to all SEND requests, for any messages that you want to persist across ActiveMQ restarts. This is the opposite of the default for JMS submitted messages.

Working with JMS Text/Bytes Messages and Stomp

Stomp is a very simple protocol - that's part of the beauty of it! As such, it does not have knowledge of JMS messages such as TextMessages or BytesMessages. The protocol does however support a content-length header. To provide more robust interaction between Stomp and JMS clients, ActiveMQ keys off of the inclusion of this header to determine what message type to create when sending from Stomp to JMS. The logic is simple:

Inclusion of content-length header Resulting Message
yes BytesMessage
no TextMessage

This same logic can be followed when going from JMS to Stomp, as well. A Stomp client could be written to key off of the inclusion of the content-length header to determine what type of message structure to provide to the user.

Message transformations

The transformation message header on SEND and SUBSCRIBE messages could be used to instruct ActiveMQ to transform messages from text to the format of your desire. Currently, ActiveMQ comes with a transformer that can transform XML/JSON text to Java objects, but you can add your own transformers as well.

Here's a quick example of how to use built-in transformer (taken from test cases)


    private String xmlObject = "<pojo>\n" 
            + "  <name>Dejan</name>\n"
            + "  <city>Belgrade</city>\n" 
            + "</pojo>";

    public void testTransformationReceiveXMLObject() throws Exception {
    	
        MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
        ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
        producer.send(message);
    	
        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
        stompConnection.sendFrame(frame);

        frame = stompConnection.receiveFrame();
        assertTrue(frame.startsWith("CONNECTED"));

        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-object-xml\n\n" + Stomp.NULL;
        stompConnection.sendFrame(frame);
        
        frame = stompConnection.receiveFrame();

        assertTrue(frame.trim().endsWith(xmlObject));
        
        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
        stompConnection.sendFrame(frame);
    }     
Dependencies

ActiveMQ uses XStream for its transformation needs. Since it's the optional dependency you have to add it to broker's classpath by putting the appropriate JAR into the lib/ folder. Additionally, if you plan to use JSON transformations you have to add Jettison JSON parser to the classpath.

In order to create your own transformer, you have to do the following:

  1. Build your transformer by implementing a FrameTranslator interface
  2. Associate it with the appropriate header value by creating a file named as a value you want to use in the META-INF/services/org/apache/activemq/transport/frametranslator/ folder of your JAR which will contain the value class=full classname of your transformer

For example the built-in transformer contains the following value

class=org.apache.activemq.transport.stomp.XStreamFrameTranslator

in the META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml file.

Debugging

In case you want to debug Stomp communication between broker and clients you should configure the Stomp connector with the trace parameter, like this:

<transportConnectors>
   <transportConnector name="stomp" uri="stomp://localhost:61613?trace=true"/>
</transportConnectors>

This will instruct the broker to trace all packets it sends and receives.

Furthermore, you have to enable tracing for the appropriate log. You can achieve that by adding the following to your conf/log4j.properties

log4j.logger.org.apache.activemq.transport.stomp=TRACE

Finally, you will probably want to keep these messages in the separate file instead of polluting the standard broker's log. You can achieve that with the following log4j configuration:

log4j.appender.stomp=org.apache.log4j.RollingFileAppender
log4j.appender.stomp.file=${activemq.base}/data/stomp.log
log4j.appender.stomp.maxFileSize=1024KB
log4j.appender.stomp.maxBackupIndex=5
log4j.appender.stomp.append=true
log4j.appender.stomp.layout=org.apache.log4j.PatternLayout
log4j.appender.stomp.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n

log4j.logger.org.apache.activemq.transport.stomp=TRACE, stomp
log4j.additivity.org.apache.activemq.transport.stomp=false

After this, all your Stomp packets will be logged to the data/stomp.log

Java API

Since version 5.2, there is a simple Java Stomp API distributed with ActiveMQ. Note that this API is provided purely for testing purposes and you should always consider using standard JMS API from Java instead of this one. The following code snippet provides a simple example of using this API:

StompConnection connection = new StompConnection();
connection.open("localhost", 61613);
		
connection.connect("system", "manager");
StompFrame connect = connection.receive();
if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
	throw new Exception ("Not connected");
}
		
connection.begin("tx1");
connection.send("/queue/test", "message1", "tx1", null);
connection.send("/queue/test", "message2", "tx1", null);
connection.commit("tx1");
	
connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT);
	
connection.begin("tx2");
	
StompFrame message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
	
message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
	
connection.commit("tx2");
		
connection.disconnect();

This example is distributed with the ActiveMQ distribution. You can run it from the example folder with

ant stomp

Stomp extensions for JMS message semantics

Note that Stomp is designed to be as simple as possible - so any scripting language / platform can message any other with minimal effort.

Stomp allows pluggable headers on each request such as sending & receiving messages. ActiveMQ has several extensions to the Stomp protocol, so that JMS semantics can be supported by Stomp clients. An OpenWire JMS producer can send messages to a Stomp consumer, and a Stomp producer can send messages to an OpenWire JMS consumer. And Stomp to Stomp configurations, can use the richer JMS message control.

Stomp supports the following standard JMS properties on SENT messages:

Stomp header JMS header Description
correlation-id JMSCorrelationID Good consumers will add this header to any responses they send
expires JMSExpiration Expiration time of the message
persistent JMSDeliveryMode Whether or not the message is persistent
priority JMSPriority Priority on the message
reply-to JMSReplyTo Destination you should send replies to
type JMSType Type of the message
JMSXGroupID JMSXGroupID Specifies the Message Groups
JMSXGroupSeq JMSXGroupSeq Optional header that specifies the sequence number in the Message Groups

ActiveMQ extensions to Stomp

You can add custom headers to Stomp commands to configure the ActiveMQ protocol. Here are some examples:

Verb Header Type Description
CONNECT client-id string Specifies the JMS Client ID which is used in combination with the activemq.subcriptionName to denote a durable subscriber.
SUBSCRIBE selector string Specifies a JMS Selector using SQL 92 syntax as specified in the JMS 1.1 specificiation. This allows a filter to be applied to each message as part of the subscription.
SUBSCRIBE activemq.dispatchAsync boolean Should messages be dispatched synchronously or asynchronously from the producer thread for non-durable topics in the broker? For fast consumers set this to false. For slow consumers set it to true so that dispatching will not block fast consumers.
SUBSCRIBE activemq.exclusive boolean I would like to be an Exclusive Consumer on the queue.
SUBSCRIBE activemq.maximumPendingMessageLimit int For Slow Consumer Handling on non-durable topics by dropping old messages - we can set a maximum-pending limit, such that once a slow consumer backs up to this high water mark we begin to discard old messages.
SUBSCRIBE activemq.noLocal boolean Specifies whether or not locally sent messages should be ignored for subscriptions. Set to true to filter out locally sent messages.
SUBSCRIBE activemq.prefetchSize int Specifies the maximum number of pending messages that will be dispatched to the client. Once this maximum is reached no more messages are dispatched until the client acknowledges a message. Set to 1 for very fair distribution of messages across consumers where processing messages can be slow.
SUBSCRIBE activemq.priority byte Sets the priority of the consumer so that dispatching can be weighted in priority order.
SUBSCRIBE activemq.retroactive boolean For non-durable topics make this subscription retroactive.
SUBSCRIBE activemq.subscriptionName string For durable topic subscriptions you must specify the same clientId on the connection and subcriptionName on the subscribe.
Note the spelling: subcriptionName NOT subscriptionName. This is not intuitive, but it is how it is implemented in ActiveMQ 4.x. For the 5.0 release of ActiveMQ, both subcriptionName and subscriptionName will be supported
Graphic Design By Hiram