|
ActiveMQ supports advisory messages which allows you to watch the system using regular JMS messages. Currently we have advisory messages that support
Advisory messages can be thought as some kind of administrative channel where you receive information regarding what is happening on your JMS provider along with what's happening with producers, consumers and destinations. When you look at a broker via JMX you will see the advisory topics prefixed with ActiveMQ.Advisory.. Every Advisory has the message type 'Advisory' and some predefined message properties:
In addition, some messages carry a Command object - which carries more information for the advisory - e.g. A subscription to each of the destination returns an ActiveMQMessage. Specific DataStructure objects (ie. ConsumerInfo, ProducerInfo,ConnectionInfo) can be retrieve via getDataStructure method of ActiveMQMessage. For example:
...
Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);
....
public void onMessage(Message msg){
if (msg instanceof ActiveMQMessage){
try {
ActiveMQMessage aMsg = (ActiveMQMessage)msg;
ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
} catch (JMSException e) {
log.error("Failed to process message: " + msg);
}
}
}
The following advisory topics are supportedClient based advisories
Note that the consumer start/stop advisory messages also have a consumerCount header to indicate the number of active consumers on the destination when the advisory message was sent. This means you can use the following selector to be notified when there are no active consumers on a given destination... consumerCount = 0 Destination and Message based advisories
New advisories in version 5.2
Enabling advisories not turned on by defaultThe advisories that are not turned on by default (see the last column) can be enabled on a PolicyEntry in the ActiveMQ Broker Configuration - e.g. - to enable a message consumed advisory you can configure the following:
<destinationPolicy>
<policyMap><policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" />
</policyEntries></policyMap>
</destinationPolicy>
Disabling advisory messagesThe use of advisory messages incurs a small overhead in terms of memory and connection resources that is related to the number of destinations in your system. In some cases it can make sense to disable all advisories. Advisories need to be disabled both on the Broker, via XML Configuration
<broker advisorySupport="false">...
or from java code BrokerService broker = new BrokerService(); broker.setAdvisorySupport(false); ... broker.start(); and on your ActiveMQConnectionFactory (because a subscription to an advisory topic will auto create it) via the brokerUrl "tcp://localhost:61616?jms.watchTopicAdvisories=false"
or via java code using the 'watchTopicAdvisories' attribute on the ActiveMQConnectionFactory. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setWatchTopicAdvisories(false);
Using the destinationsAll of the above destinations are really prefixes which are appended with important information (like the actual topic or queue, the client ID, producer ID, consumer ID etc). This allows you to reuse the power of publish/subscribe, Wildcards and Selectors to filter the advisory messages as you see fit. For example if you want to subscribe to expired messages on a topic FOO.BAR you could subscribe to ActiveMQ.Advisory.Expired.Topic.FOO.BAR. To subscribe to all messages of a certain kind of advisory just append .> to the topic. e.g. to subscribe to all the consumers starting and stopping to topics and queues subscribe to ActiveMQ.Advisory.Consumer..>. Helper methodsMethods to get the advisory destination objects are available in AdvisorySupport through the following methods.
AdvisorySupport.getConsumerAdvisoryTopic()
AdvisorySupport.getProducerAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getDestinationAdvisoryTopic()
AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
//Version 5.2 onwards
AdvisorySupport.getSlowConsumerAdvisoryTopic()
AdvisorySupport.getFastProducerAdvisoryTopic()
AdvisorySupport.getMessageDiscardedAdvisoryTopic()
AdvisorySupport.getMessageDeliveredAdvisoryTopic()
AdvisorySupport.getMessageConsumedAdvisoryTopic()
AdvisorySupport.getMasterBrokerAdvisoryTopic()
AdvisorySupport.getFullAdvisoryTopic()
Some helper classes to deal with advisory messages are available in the advisories package. For users of previous releases see the Advisory Support in ActiveMQ 3 |