Slow Consumer Problem Statement

The problem with slow consumers is that the broker must act as a buffer until they can catch up. However the broker does not have infinite resources so it will fail if the consumer does not catch up.

Consumers in the Java Broker

The types of queues where a slow consumer can occur boils down to two properties: durability and bound exchange.

Queues that are bound to the amq.direct exchange, i.e. JMS Queues, are not going to be included in this work.
Queues bound to other exchanges such as amq.match, the Headers exchange will also not be included in this work.

This reduces the queues to consider to just queues bound to the topic exchange.

Topics

In AMQP consumption is always from an AMQP Queue to avoid confusion with JMS Queues in the following discussion the term topic is defined to mean an AMQP Queue bound to the amq.topic exchange.

When a topic reaches a set threshold for message count, size or age the attached consumer session we have three options.

  • Flow the producer.
  • Disconnect the slow consumer.

Work has already been done to flow producers on queues: Producer flow control.

This leaves us with two options.

Disconnect the Slow Consumer

For non-durable topics this means that it will be deleted so potentially freeing up the memory used by the messages. Remember the messages are shared across all topics so the memory will only be freed up when all the topics no longer require the message.

For durable topics (JMS Durable Subscriptions) disconnecting the consumer will leave the queue bound and receiving messages. This will only make the memory situation worse as we now have a queue with no consumer rather than just a slow consumer. If all the messages on the topic are persistent then they can be evicted from memory if required but there is no guarantee that all the messages will have been sent persistently.

On disconnection the consumer would receive an AMQP error, 506 Resource Limit Exceeded/Resource Error. For non-durable consumers this will always work. However, for a durable subscription it is possible that the consumer has disconnected when the limit is reached. So whilst deleting their subscription would be in line with the configuration it would not be expected by the user. The configuration for enabling slow consumer disconnection should allow for durable subscriptions to be maintained, targeting only transient subscriptions for disconnection.

Design Specification

This work is mainly focused on the broker however the the client may also require changes to ensure that the error is correctly reported.

Broker Changes

Extension Point
To enable the broker to monitor the queues and perform the appropriate action we can extend a existing mechanism. That of the VirtualHost housekeeping thread. This is a thread that checks all the queues for alerting purposes. Currently this is done via a single TimerTask however by updating this to utilise a ScheduledThreadPoolExecutor we can run arbitrary processes in the pool and ensure that any error in their operation does not prevent them from running on their defined schedule. This will allow slow consumers to be checked and disconnected on a periodic basis.

Queue Detection
The target queues can easily be identified by checking their bindings. Topics are all bound to the TopicExchange. Once we have identified a topic exchange we can use the queue assigned configuration to determine if we are checking depth, messageCount or messageAge as a means of selecting the subscription for processing.

Processing
We will identify the session/channel that the subscription is on and close it with the appropriate error code. The delete will then ensure that the queue is deleted and all messages released after the session/channel has been closed.

Error Code
The AMQP error code 506 will be used to communicate the failure to the client. This is defined as a Resource Error or Resource Limit Exceeded in 0-8/9/91 and 0-10 respectively. In addition the protocol allows for a textual description to be sent back to the client. In this field we will send 'Consuming too slow.'

Client Changes

Error Processing
Currently a 0-8/9/91 Session will propagate a ChannelCloseException to the client via the JMS ExceptionListener we need to ensure that the 0-10 code path will create the same Exception type and present it to the JMS ExceptionListener.

After Effects
The Exception that is thrown should not be classed as a 'HardError' which would result in Failover starting. After the exception has been received the Consumer and the associated Session should be closed however the Connection will still be operational. This will allow the client to perform recovery without having to reestablish its Connection.

Configuration

Picking up on the Topic Configuration Design the addition of slow consumer configuration would be done using a 'consumer' element.

The topic currently exposes three properties that we can use to control the client, depth, oldest message, and count. The configuration will provide the option to one or all of these values to apply to the specified topic. In the situation where more than one value is specified they will all be used to trigger the policy. e.g. setting count to 10 and depth to 1024 would allow the 10 messages to exist as long as their total size was not more than 1024.

One additional property that would be of use here would be the consumption rate. If the topic reported the consumption rate this property could be used to define a threshold that the consumer must stay above.

Consumer Element for configuration
        <consumer>
            <!-- The depth before which the policy will be applied-->
            <depth>4235264</depth>

            <!-- The message age before which the policy will be applied-->
            <messageAge>600000</messageAge>

            <!-- The number of message before which the policy will be applied-->
            <messageCount>50</messageCount>

            <!-- Policies configuration -->
            <policy name="Delete">
                <options>
                    <option name="delete-persistent" value="true"/>
                </options>
            </policy>
        </consumer>

This <consumer> element will be added to the existing queue configuration to allow specific durable subscriptions to be identified and processed. In addition a new <topic> element will be added to allow configuration for topics. The resulting section of xml would look like this:

Topic configured for slow consumer disconnection
    <topic key="stocks.us.*">
        <consumer>
            <!-- The depth before which the policy will be applied-->
            <depth>4235264</depth>

            <!-- The message age before which the policy will be applied-->
            <maessageAge>600000</messageAge>

            <!-- The number of message before which the policy will be applied-->
            <messageCount>50</messageCount>

            <!-- Policies configuration -->
            <policy name="Delete">
                <options>
                    <option name="delete-persistent" value="true"/>
                </options>
            </policy>
        </consumer>

    </topic>

Testing Spec

Testing for this new feature will mainly rely on system testing.

Unit Testing

Configuration changes need to be validated as part of existing Configuration Testing.

System Testing

System testing requries a number of dimensions to be varied.

  1. Protocol Version
  2. Client Ack Mode
  3. Client Consume mode
  4. Topic Durability
Protocol Version

Testing should be completed at a minimum on the two sets of protocol 0-8/0-9/0-91 and 0-10. Ideally the test would be run on each protocol version to verify the protocol exception is correctly propagated.

Client Ack Mode

The tests should be run against each client ack mode to validate if there is any difference in the exception handling. Transacted for instance should fail to commit by throwing the expected exception as well as having the exception appear on the ExceptionListener.

The NoAck case has addition issue in that it can overwhelm IO layer in presence of a slow consumer. This should be verified however its resolution is beyond the scope of this work.

Client Consume Mode

The client can consume in one of two ways. Synchronously using receive() or asynchronously using a MessageListener.

Topic Type

Topics can be created as durable or non-durable(transient) both of these configurations should be tested as any exception should be reported in the same way. Additionally as the configuration has the ability to selectively delete durable topics this must also be tested. The required exception should be thrown when enabled but not thrown when the configuration does not control durable topics.

  • No labels