Worked LVQ example
When running the following example, that happen when a message with the same key is published before the previous one if old message gets replaced. This example sends for messages with data & values set to key1, key2, key3, key1.
If the messages are enqueued before the listener consumes then you get the following output:
Sending Data:key1 Sending Data:key2 Sending Data:key3 Sending Data:key1 Sending Data:last Receiving Data:key1 Receiving Data:key2 Receiving Data:key3 Receiving Data:last
Source for example
#include <qpid/client/Connection.h> #include <qpid/client/SubscriptionManager.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> #include <qpid/client/QueueOptions.h> #include <iostream> using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; using namespace std; struct Args : public qpid::Options, public qpid::client::ConnectionSettings { bool help; Args() : qpid::Options("Simple latency test optins"), help(false) { using namespace qpid; addOptions() ("help", optValue(help), "Print this usage statement") ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay"); } }; class Listener : public MessageListener { private: Session session; SubscriptionManager subscriptions; std::string queue; Message request; QueueOptions args; public: Listener(Session& session); void setup(); void send(std::string kv); void received(Message& message); void start(); }; Listener::Listener(Session& s) : session(s), subscriptions(s), queue(session.getId().getName()) {} void Listener::setup() { // set queue mode args.setOrdering(LVQ); session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true, arg::arguments=args); request.getDeliveryProperties().setRoutingKey(queue); } void Listener::start() { subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); subscriptions.run(); } void Listener::send(std::string kv) { std::string key; args.getLVQKey(key); request.getHeaders().setString(key, kv); request.setData( kv); cout << "Sending Data:" << kv << std::endl; async(session).messageTransfer(arg::content=request); } void Listener::received(Message& response) { cout << "Receiving Data:" << response.getData() << std::endl; if (response.getData() == "last"){ subscriptions.cancel(queue); } } int main(int argc, char** argv) { Args opts; opts.parse(argc, argv); if (opts.help) { std::cout << opts << std::endl; return 0; } Connection connection; try { connection.open(opts); Session session = connection.newSession(); Listener listener(session); listener.setup(); listener.send("key1"); listener.send("key2"); listener.send("key3"); listener.send("key1"); listener.send("last"); listener.start(); connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } return 1; }