Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Controlling topic subscriptions

queue limits and slow subscriber

Over AMQP 0-10, topic subscriptions are implemented as private subscription queues bound to an exchange that represents the topic. If a subscriber doesn't keep up with the rate of published messages of interest, then the subscription queue starts to grow. Limiting the growth can be useful. To do that you can specify limits on a queue as message count or as an aggregate content size. E.g. to limit the queue to 500 messages:

Code Block
  my-topic; {link:{x-declare:{arguments:{'qpid.max_count':500}}}}

or to limit the queue to containing as many messages as make up 100,000 bytes of message content:

Code Block
  my-topic; {link:{x-declare:{arguments:{'qpid.max_size':100000}}}}

You can combine the two values as well if desired, and the first limit to be reached will be enforced.

This protects you from unlimited queue growth. However when the limit is reached, publishers will start receiving exceptions. You may instead wish to simply drop messages for consumers that can't keep up. You can do this by setting the queue to be a 'ring queue'. Once the configured count and/or size is reached, further messages enqueued will result in the oldest message(s) being removed to make space. To configure this you would use and address of the form:

Code Block
  my-topic; {link:{x-declare:{arguments:{'qpid.max_count':500,'qpid.policy_type':'ring'}}}}
no-local

You can request that your subscriber does not receive any messages published on that same connection using the no-local option:

Code Block
  my-topic; {link:{x-declare:{arguments:{no-local:True}}}}
reliability and durability

Where you need to ensure that you receive all messages, even if temporarily disconnected (e.g. through client or network failure), you need to specify a subscription name (this needs to be unique) and mark the queue as durable:

Code Block
  my-topic; {link:{durable:True, name:'my-subscription'}}

and/or reliable:

Code Block
  my-topic; {link:{reliability:at-least-once, name:'my-subscription'}}

Note that in this case you need an explicit close of the receiver in order to cancel the subscription. You may want additionally to ensure that if disconnected for a certain amount of time, the subscription queue will get deleted by the broker. E.g. to ensure that if disconnected for more than 30 seconds the subscription is cleaned up you could use:

Code Block
  my-topic; {link:{reliability:at-least-once, name:'my-subscription', x-declare:{auto-delete:True, arguments:{'qpid.auto_delete_timeout':30}}}}
subscription bindings

In simple cases you can filter the set of messages of interest from a topic using a subject. However in certain situations you may need more explicit control. For example if you are using the headers exchange as the basis of your topic you need to supply binding arguments. An example, which would select only messages with an 'id' header equal to 'abc' and a type header equal to 'xyz', might be:

Code Block
  my-headers-exchange; {link:{x-bindings:[{arguments:{'x-match':all, id:abc, type:xyz}}]}}

alternatively you might want to combine several different routing patterns for the same subscription, e.g. to get all messages either starting with word 'abc' or ending with word 'xyz' on a topic exchange you could use:

Code Block
  my-topic-exchange; {link:{x-bindings:[{key:abc.#},{key:#.xyz}]}}

Creating shared queues on-demand

Another use case for more complex addresses is where shared queues are to be created on demand, conforming to a specific configuration. My own advice is to think carefully about whether this is needed in any given situation or whether a static configuration might be more appropriate. However for those who feel they require it, here are some examples.

queue limits

Much like the case described in slow consumers, bounding the size of shared queues is often a sensible approach. Here the address utilises the node properties rather than the link properties, e.g

Code Block
  queue-a; {create: always, node:{x-declare:{arguments:{'qpid.max_count':1000}}}}

or indeed:

Code Block
  queue-b; {create: always, node:{x-declare:{arguments:{'qpid.max_count':1000, 'qpid.max_size':1000000}}}}
LVQs

To create a last value queue, keyed on qpid.subject:

Code Block
  my-lvq; {create: always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':'qpid.subject'}}}}
priority queues

To create a queue that recognises 10 different priority levels:

Code Block
  my-priority-queue; {create: always, node:{x-declare:{arguments:{'x-qpid-priorities':10}}}}

Note that qpid.priorities can be used as an alias for x-qpid-priorities with the c++ broker (i.e. qpidd) for the sake of consistency; the latter is supported by the Java broker as well however.

automatic deletion of queues

Though the addressing syntax supports a 'delete' option that is notionally analogous to the 'create' option, it is implemented at present in the client library. Thus if the client crashes, the delete will not be triggered. If two clients trigger the delete concurrently, one of them may see an error. For these reasons, automatic deletion of queues is often better done on the server side by setting the auto-delete option:

Code Block
  my-autodeleted-queue; {create: always, node:{x-declare:{auto-delete:True}}}