The Java Broker currently performs no throttling of producing clients. In combination with the way that the Java Broker holds every transient message in memory until consumed, we can encounter scenarios where the Java broker runs out of heap space. For example, if a producer P sends messages at a rate of 100msg/s to a queue Q, but the only consumer, C, of queue Q processes messages at a rate of 10msg/s, then Q will grow at a rate of 90msg/s until such time as the broker runs out of heap space.
Tactically we may attempt to solve the problem of Queues becoming overfull, and thus causing out of memory exceptions, without attempting to solve the totality of out of memory issues.
AMQP0-8/0-9/0-9-1 provides no mechanism for throttling producers of messages based on credit (either for a given destination, or even at the granualrity of a session). There are two mechanisms available to throttle a producing client - the use of TCP flow control, and the use of the AMQP Channel.Flow command.
The use of TCP flow control throttles the producer to the rate at which the Broker can process the incomming messages, but does not address the throttling of the producer to the consumption of messages by a third part consuming client.
The Channel.Flow command instructs the recipient to either cease (or resume) sending messages. The receiver of the command should send Channel.Flow-ok once the flow command has been received.
In AMQP0-9-1 and earlier we cannot determine prior to a producer sending a message, which queues a producer wishes to send to. Thus we are limitted in general to a reactive flow control - that is, when a producer attempts to send to an overful queue we can request that the sender send no more messages, my issuing a Channel.Flow. Further, since many messages may already be "on-the-wire" by the time our Channel.Flow is received, we cannot guarantee by how much the producer may "overfill" the queue before it ceases publishing.
Allow each queue on the Java Broker to be configured with a "full" size. Implement flow control such that the publisher of a message which is enqueued on a "full" queue is immediately sent a Channel.Flow command to cease publication. Monitor queue sizes such that when an "overfull" Queue has available space, then sessions which are blocked waiting for this event are free to send messages again.
Ensure that the Java Client respects the Channel.Flow command, and causes all attempts to send Messages to block, until the session is unflowed.
Add the following configurable properties to Queues:
capacity: size in bytes at which the queue is thought to be full (and thus publishes which send messages which take the total queue size above this mark will be blocked). Default 0 (no maximum)
flowResumeCapacity: the queue size at which producers are unflowed (defaulted to capacity)
Like other such values these may be set on individual queues in the config, or on a per-virtualhost basis.
Alter the following files in the org.apache.qpid.server.configuration package to set the queue properties based on this configuration:
Alter the AMQQueue.java interface to add the following method
Update the following two classes in package org.apache.qpid.server.txn to call checkCapacity on the queue after they have enqueued a message
Add the following code to AMQChannel in package org.apache.qpid.server
Modify SimpleAMQQueue to perform the capacity check, and also to unblock blocked channels when the queue reduces in size.
Hook in the existing handler for ChannelFlow commands by altering the dispatchChannelFlow method in the ClientMethodDispatcherImpl class
Additionally logging messages should be emitted
1) on the broker each time the queue issues an overfull request to a session to start flow control
2) on the client every time it receives a flow control command from the broker
3) on the client every time it attempts to send a message but finds itself blocked by broker flow control - in particular this message should repeat periodically untill the message is sent