Producer flow control is necessary to stop clients from overwhelming a broker if messages are not being consumed fast enough, this is filed in Jira as QPID-942.

Use cases

1. Consuming Client lags Publishing Client (P2P)

Desc
This scenario is where the client consuming cannot process the data published to its queue by another client as the same rate i.e. consumption lags publication. This seems to be an almost de facto use case for P2P messaging in that by its nature (and possibly via the client code) consuming messages involves more processing than publishing i.e. send() is a less complex action than receive() or onMessage(). It can also happen when the consumer goes away.

Result
Message back up in the queues on the broker and are not being drained by the consumer. This may eventually lead to OOM as the VM cannot garbage collect the message refs. This could happen slowly over a period of time, so the MINA buffers may be empty (or at least not represent any significant amount of memory use).

2. Unconsumed messages remain in Queues (PubSub)

Desc
This is where data is being published to topics in the broker for which subscriptions exist, but no consuming client acks the messages.

Result
Messages back up in the broker and with durable subscriptions never go away. The broker OOMs are the queues are full. Again this happens over time and the MINA buffers may not be impacted. Without TTL set (and set low enough) then the data backs up in the client's sub queues.

3. Consuming Client cannot process large messages

Desc
This is where the consumer cannot process a large message sitting in its queue. This may be because it does not have enough memory or disk available for the processing, for example. It may also arise if the message is corrupted in some way i.e. malformed XML etc. The message doe not get ack'd and remains in the broker on a queue, currently surviving restart.

Result
The message(s) remain in the broker and can cause OOM, particularly when there's a burst of large messages together. An example of this is the Qlib issue where they had a spate of large messages and client side OOM precluded them being processed. It can happen slowly, and with persistent messages costs at least twice as much heap. Broker OOM follows eventually.

Plan

To implement this, the following changes are necessary:

Client
send() needs to become potentially blocking, if the producer has been flow controlled then send() should not either throw an exception (which will be the default behaviour), or it will block until the producer has been unflowed (this will only occur of a system property has been set).

Broker
When a message has been enqueued, the broker should check if the producer is publishing to a queue which has violated it's policy, if so then the producer will be flow controlled. When a consumer has had a message delivered, if the queue is no longer violating it's policy then producers will be unflow controlled. This check will occur after enqueing so as not to slow down the broker.

Queues and exchanges will have policies attached to them (queues will inherit from their exchange if they do not have one), which will specify the point at which producers should be flow controlled in terms of queue count or queue depth. These policies will be manageable over JMX, so they can be applied or removed without having to restart the broker.

The management console will also gain a "stop all producers" button to enable throttling of arbitary queues, and a "start all producers" button which will start all flowed producers.

Disadvantages of this approach

The producer will not be flowed until they publish to a queue which is violating it's policy, so if you have N producers each publishing to a queue, you will get N messages on top of the one that pushes the queue into a delinquent state. 

  • No labels

12 Comments

  1. A few Questions:
    Will the policies be a straight limit or high/low marks?
    Will the policies be per queue or per routing-key? Just thinking of the wild-card topic publication case.

    When to send flow control message
    You are going to need to record all the sessions that have published to a given queue(publishers) so that you can notify them when they have flow resumed. Would it not be useful to use this same list to flow producers when the limit is hit. This would mean that you don't have to wait for a publisher to publish a message to the blocked queue to get the notification.

    The disadvantage of this would be that all previous producers would receive the flow message. So while this would provide earlier feedback for medium to high volume publishers the slower volume publishers may never need to be flow controlled if the situation is resolved on the broker before they need to publish again.

    How do you intend to get round the problem that Publish in 0-8/0-9 is asynchronous? To maintain JMS compliance we have to accept the received Publish request even if we have hit the limit as the client is non-blocking so will be unable to throw a JMSException on the failure to publish.

    Martin

    1. Policies will essentially be high water marks, applied onto queues, routing keys or exchanges.

      We don't need to record all the sessions that have published, just the ones that have been flowed, which should be cheaper. Recording the list of all sessions that have published could unnecessarily flow some producers that weren't going to publish again, but I guess that's also a risk with doing it once they've published-while-delinquent and is probably more in line with user expectations.

      The reason that they get flowed after the delinquent publish is to deal with the async publish.

  2. Unfortunately, this mechanism creates a denial-of-service vulnerability as bad or worse than the one you are fixing.

    By creating a queue, binding it to an exchange, and never consuming the messages on that queue, I can effectively shut down that exchange by causing all producers that send to that exchange to be blocked.

    A less dramatic illustration of this affect is that all consumers bound to the same exchange will be throttled to the speed of the slowest consumer.

    -Ted

  3. Ted: The DoS is worse if you have malicious consumers, the immediate motivation is a fix for unreliable consumers. Also, the DoS in your example is limited to an exchange or routing key in one vhost, rather than taking out the whole broker so is still a step up. It's also recoverable with tools and less mysterious than "broker goes bang".

    1. Aidan,

      I see your point and agree. There will continue to be interesting technical challenges related to producer flow control.

      -Ted

  4. I've been kicking a bunch of ideas around on this front but none really grabs me yet. A key point here is that this debate really needs to happen at the AMQP working group, so that we can have an interoperable solution. Some things to think about:

    1. AMQP already defines flow control to Exchanges, treating them identically to subscribers (both are referred to as "destinations" in the spec.) I feel like it should be possible to use this mechanism but there are some troubles:

    A client subscription has exactly one "producer", i.e. the queue it is subscribed to. So it is easy to implement per-subscription flow policies, indeed we already do this - I think the java side calls it "prefetch".

    An exchange on the other hand has many producers - any attached session can send messages to any exchange. We can control the flow to an exchange per session but this doesn't let us fix a limit for the exchange as a whole. It does let us throttle individual clients somewhat so it may provide some help, but its not straightforward to use it to implement an overall exchange flow rate policy. Its even harder to see how we could implement a queue policy as a queue may be bound in many exchanges and there's no way to predict in advance how much exchange traffic will go to any particular queue.

    2. TCP flow control. In the absence of protocol methods for flow control, a simple technique is the following: if some session action would violate a policy, stop reading that sessions connection for up to some timeout period. If something happens (e.g. a message is dequeued) in that period that allows the problem session to proceed the action and start reading the connection again. If the action can't be handled within the timeout, kill the offending session and start reading the connection again.

    Note the timeout above needs to be less that TCP's timeout or the client may disconnect the entire connection. A variation is to dispense with the timeout and let TCP decide the timeout, but in that case the entire connection will always be killed of the action can't be executed within TCP's timeout.

    This could be considered a fall-back postion and combined with 1. above - we set exchange flow rates to values that we think are likely to avoid policy violations. Hopefully this works most of the time but if a violation is about to occur we fall back to 2 so our policies are respected.

    3. New flow control commands: I'm not sure what ideal flow-control commands would look like. First thing to note: I think it is impossible in principal to guarantee per-queue policies will be respected using flow control commands alone. This is because messages are not sent to queues but to exchanges. The client does not know in general what queue(s) a message is going to so it cannot repsect per-queue limits. Remember that flow control depends on the producer volutarily limiting the number of messages it sends based on rules established in advance. Not knowing what queue a messages will go to means its impossible to establish per-queue flow limits on the client in advance. So there will always be a need for some fall-back position like 2 if there are per-queue policies.

    I do think that flow control between a session and the broker as a whole would be more usable than flow control for each session/exchange pair. It would at allow the broker to use fixed-size input buffers per session and to enforce a total-queue-content limit.

    1. So, splitting this into two different problems, and rescoping to a 0-10 solution, we have two granularities of policy (exchange-level flow control and queue-level flow control) and two levels of security (conforming clients and non-conforming clients).

      Dealing with granularity first:

      0-10 already defines flow control for exchanges, we can issue message.stop for all producers who are going to publish, allowing further credit when the policy is no longer being violated.

      For queue level flow control we should just abandon it and implement exchange+routing key policies instead in AMQP. These would follow the same matching algorithims as the exchanges they are bound to. Message.stop would gain a 'routing-key' argument which would ban the producer from publishing to that combination.

      Security can be dealt with by ensuring that violations of message.stop are fatal errors. Extra defensiveness could optionally be introduced by tarpitting the connection like some SMTP servers do.

      Policy calculation and administration would probably be best dealt with as part of some sort of management API, but I would imagine it could involve "total size of queues routed to by key" or "maximum size of any queue routed to by key" or variations thereof.

  5. A couple of comments...

    Firstly, for 0-10, the flow control is on a per exchange basis for each individual session.  There is no reason to flow control every producer which has published to an exchange, you can (if you so wish) limit yourself to flow controlling only those sessions which are (historically) publishing to overfull queues.

    In the AMQP context we are looking at producer side flow control to see what (if anything) needs to be done on this [for instance, it may be sufficient just to hold back completion notification, or use underlying network protocol flow control... alternatively we may use some finer grained notion of destination (exchange and routing-key pattern)]...   

    In the Qpid 0-8/0-9 implementation we have no such fine-grained tools to help us - only the ability to turn on and off a channel at a time.  For this the only thing we can go on is past publishing history.  In terms of the architecture of the Java Broker I would be expecting two separate triggers that may cause flow control to be invoked

    i) A large number of jobs have backed up for a given connection and have not yet been processed.  In this case I would expect flow control to be invoked on all channels on that connection.

    ii) A given queue has gone beyond some threshold point.  Here we have some difficulty...  We may elect to issue (progressively) flow control to sessions which have (in the recent past) published to that queue...  Alternatively we may elect to simply cease processing jobs for a given connection if we find a job which would push a queue beyond it's threshold limit... This would then (eventually) invoke case (info) above.

    1. 0-10:
      Flowing producers individually is probably a better idea, although it would require tracking who's used what routing keys as well as what routing keys end up on which queues.

      > 0-10:
      Holding back completion notification seems insufficent to me, you'd run into the same problem described above where you could end up with an extra message per producer on the queue and the producer wouldn't know why the ack hasn't come back. TCP flow control is a bit better, but has the problem that you're flowing the whole connection, which could cause problems if a data queue is full and would get emptied by a consumer which is awaiting a 'go' message on a control queue. I think a finger grained notion of destination is the right answer, and one that makes a lot of things a lot clearer. Is there an AMQP Jira that talks about this?

      < 0-10:
      TCP flowing connections that have exceeded a high water mark for jobs, and unflowing once it hits a low water mark seems sensible, but we'd need a fair amount of additional accounting information to implement this.

      Flowing on the channel is more problematic, it would require much of the same machinery from the 0-10 case but wouldn't be as granular. Oh well.

  6. It is useful to this of what happens at the limits. Ultimately, a very fast publisher (Quad Xeon) publishing to a slow broker (e.g. running on a Commodore 64); middleware does not have infinite performance.

    Note we may not have hit queue limits here at all, we might just be too slow. There are TWO problems i) when we are full vs ii) when we can't cope with a data rate (writing to floppy is slow on my Commodore 1541).

    So what do we do?
    We physically do not have the capacity in the broker to deal with the publish request, we are over run (or near it). So we preserve ourself for the benefit of consumers and other publishers.

    Thinking backwards in terms of severity:
    1) The ultimate sanction is to drop the publishers connection with a suitable error message. It's up to the architect to buy better hardware or build a better app. Not our problem. Aidans idea of tarpitting on reconnect might be neat here.
    2) We drop a channel
    3) We block the TCP connection (nasty, because its quite lumpy and doesn't give the opportunity for the publisher to behave better)
    4) We return STOP messages (or add some way to say STOP FOR Xms (a protocol change would be needed)
    5) We conflate messages where that is possible (this is an option in both 'too fast' and 'full up' scenarios)

    The "stop" messages give the client a change to adjust its rate without fancy MT code.

    If a queue is full, and a client goes to publish again, when what?
    1) For transacted sessions, rollback the transaction with a Queue full indication
    2) For Ack'ed sessions its trickier. We can never send the Ack honestly. Dropping the channel/connection may well be the best course of action. Remember in AMQP clients aren't supposed to get too upset by us dropping the connection.

    The first scenario (my trusty C64) is rate based; and probably best handled for the whole session (not just an exchange/queue). This publisher is likely just too fast on all counts.

    The second is running out of space on a queue; that's a hard condition. We can either:
    1) make it go away by applying some policy to the queue (purge out TTL exceeded messages, perhaps apply implementation specific conflation of messages if the user has configured some extension e.g. replace a price with a more recent one) or its a hard condition.
    2) With TX we just kill the TX and all is well, and with Acks, as discussed. This is something that is easy enough to evaluate in the Exchange/Queue posting logic (but what do I know?). Note that if a message is routed to N end points, a failure to insert to any of the N should cause a fail (logically speaking).
    3) Just pause the connection indefinitely (quite a nice option if there is some prospect of someone emptying the queue).

    The ability to choose between 3 and the other modes would be a nice implementation specific feature for Qpid.

    Bottom line is that rate control into a exchange/rk combo us unlikely to find much use in the real world.

    Thoughts?
    John

  7. The "slow broker" scenario isn't actually an issue. A slow broker will read messages off of the TCP connection slower than a fast producer will write them. This will cause the TCP window to fill up, and cause TCP writes to block at the producer end of the connection. As long as the producer's client doesn't buffer writes indefinitely, this will eventually cause the producer's send message routine to block, thereby flow controlling the producer to exactly the rate that the slow broker can read data off the connection.

    The problem we're trying to solve is really about producers and consumers. The simplest example is a single producer sending messages through a queue to a single consumer. If the producer sends 100 messages/second, and the consumer processes 50 messages/second, the queue will inevitably grow, and regardless of how fast/slow the broker is, it will eventually swap itself to death if there is no limit on the queue size.

    In general there may be many producers sending messages to many consumers on any given queue or topic, however the basic issue is the same. The producers need to somehow get feedback about the rate at which messages sent to a given queue or topic are consumed.

    It may be tempting to try to apply option (3) from above (pause the connection indefinitely), however that is not a safe thing to do. Given that you need to read acks from the connection in order to dequeue messages, it's easy to construct a scenario where two clients passing messages back and forth would deadlock if the two queues they're using both fill up at the same time.

    1. I think an explicit "hey, hold it" from the broker to the producer is the right thing, based on exchange+routing key (this is essentially destination), which would need a protocol change.

      The application would probably want to apply other policies (conflation, expiry) to ameliorate the issue, but I think that's beyond the scope of what the broker can solve. The primary purpouse of this limit is really to ensure adequate performance and reliability and avoid poorly configured producer/consumer pairs from taking down a shared broker.

      I think it's about time to take this to amqp.org and hash out the details there, and move the discussion for qpid to a more focussed "what do you want to do for M4?" mode.