Controlling topic subscriptions

queue limits and slow subscriber

Over AMQP 0-10, topic subscriptions are implemented as private subscription queues bound to an exchange that represents the topic. If a subscriber doesn't keep up with the rate of published messages of interest, then the subscription queue starts to grow. Limiting the growth can be useful. To do that you can specify limits on a queue as message count or as an aggregate content size. E.g. to limit the queue to 500 messages:

  my-topic; {link:{x-declare:{arguments:{'qpid.max_count':500}}}}

or to limit the queue to containing as many messages as make up 100,000 bytes of message content:

  my-topic; {link:{x-declare:{arguments:{'qpid.max_size':100000}}}}

You can combine the two values as well if desired, and the first limit to be reached will be enforced.

This protects you from unlimited queue growth. However when the limit is reached, publishers will start receiving exceptions. You may instead wish to simply drop messages for consumers that can't keep up. You can do this by setting the queue to be a 'ring queue'. Once the configured count and/or size is reached, further messages enqueued will result in the oldest message(s) being removed to make space. To configure this you would use and address of the form:

  my-topic; {link:{x-declare:{arguments:{'qpid.max_count':500,'qpid.policy_type':'ring'}}}}

You can request that your subscriber does not receive any messages published on that same connection using the no-local option:

  my-topic; {link:{x-declare:{arguments:{no-local:True}}}}
reliability and durability

Where you need to ensure that you receive all messages, even if temporarily disconnected (e.g. through client or network failure), you need to specify a subscription name (this needs to be unique) and mark the queue as durable:

  my-topic; {link:{durable:True, name:'my-subscription'}}

and/or reliable:

  my-topic; {link:{reliability:at-least-once, name:'my-subscription'}}

Note that in this case you need an explicit close of the receiver in order to cancel the subscription. You may want additionally to ensure that if disconnected for a certain amount of time, the subscription queue will get deleted by the broker. E.g. to ensure that if disconnected for more than 30 seconds the subscription is cleaned up you could use:

  my-topic; {link:{reliability:at-least-once, name:'my-subscription', x-declare:{auto-delete:True, arguments:{'qpid.auto_delete_timeout':30}}}}
subscription bindings

In simple cases you can filter the set of messages of interest from a topic using a subject. However in certain situations you may need more explicit control. For example if you are using the headers exchange as the basis of your topic you need to supply binding arguments. An example, which would select only messages with an 'id' header equal to 'abc' and a type header equal to 'xyz', might be:

  my-headers-exchange; {link:{x-bindings:[{arguments:{'x-match':all, id:abc, type:xyz}}]}}

alternatively you might want to combine several different routing patterns for the same subscription, e.g. to get all messages either starting with word 'abc' or ending with word 'xyz' on a topic exchange you could use:

  my-topic-exchange; {link:{x-bindings:[{key:abc.#},{}]}}

Creating shared queues on-demand

Another use case for more complex addresses is where shared queues are to be created on demand, conforming to a specific configuration. My own advice is to think carefully about whether this is needed in any given situation or whether a static configuration might be more appropriate. However for those who feel they require it, here are some examples.

queue limits

Much like the case described in slow consumers, bounding the size of shared queues is often a sensible approach. Here the address utilises the node properties rather than the link properties, e.g

  queue-a; {create: always, node:{x-declare:{arguments:{'qpid.max_count':1000}}}}

or indeed:

  queue-b; {create: always, node:{x-declare:{arguments:{'qpid.max_count':1000, 'qpid.max_size':1000000}}}}

To create a last value queue, keyed on qpid.subject:

  my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':'qpid.subject'}}}}
priority queues

To create a queue that recognises 10 different priority levels:

  my-priority-queue; {create: always, node:{x-declare:{arguments:{'x-qpid-priorities':10}}}}

Note that qpid.priorities can be used as an alias for x-qpid-priorities with the c++ broker (i.e. qpidd) for the sake of consistency; the latter is supported by the Java broker as well however.

automatic deletion of queues

Though the addressing syntax supports a 'delete' option that is notionally analogous to the 'create' option, it is implemented at present in the client library. Thus if the client crashes, the delete will not be triggered. If two clients trigger the delete concurrently, one of them may see an error. For these reasons, automatic deletion of queues is often better done on the server side by setting the auto-delete option:

  my-autodeleted-queue; {create: always, node:{x-declare:{auto-delete:True}}}
  • No labels