Java Client Dispatcher Changes.

Investigation of QPID-1871 has highlighted a race condition between the Dispatcher and the clients request to rollback.

Problem Summary

The problem here is that the Dispatcher has the ability to hold on to a message so when the rollback
process is believed to have completed the Dispatcher then rejects the final message AFTER the TxRollback
so that one message gets sent ahead of the other messages. The reject is dropped as the message has been
resent. This is specific to the Java Client causing the Java Broker to return messages out of order. This may be the reason that the RollbackOrderTest has been disabled. It is not clear currently if this will also affect the CPP broker. Further investigation is in required.

Operation Details

Due to the way that the AMQSession.Dispatcher is paused when a rollback operation is in progress it is possible that the Dispatcher thread is 'holding' a message for dispatch. The main loop of AMQSession.Dispatcher is shown here:

  while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null))
  {
      disp.dispatch(AMQSession.this);
  }

The problem is highlighted in the dispatchMessage call below (which is the result of disp.dispatch() on an UnprocessedMessage). If the Dispatcher is in the process of dispatching messages when a second thread calls rollback then the connection will be stopped and the dispatcher can remove a message from _queue and then stop in the dispatchMessage

 private void dispatchMessage(UnprocessedMessage message)
        {
            long deliveryTag = message.getDeliveryTag();

            synchronized (_lock)
            {
                try
                {
                    while (connectionStopped())
                    {
                        _lock.wait();
                    }
                }
                catch (InterruptedException e)
                {
                    // pass
                }

                if (!(message instanceof CloseConsumerMessage)
                    && tagLE(deliveryTag, _rollbackMark.get()))
                {
                    rejectMessage(message, true);
                }
                else
                {
                    synchronized (_messageDeliveryLock)
                    {
                        notifyConsumer(message);
                    }
                }
            }

            long current = _rollbackMark.get();
            if (updateRollbackMark(current, deliveryTag))
            {
                _rollbackMark.compareAndSet(current, deliveryTag);
            }
        }

When the connection is resumed the deliveryTag of the current message will be 'less than or equal' to the _rollbackMark as this has been set to the highest deliveryTag received prior to rollback.

                _rollbackMark.set(_highestDeliveryTag.get());

There are no guards in the code to stop the IO layer adding a new message to _queue whilst rollback is in progress. However, both 0-8 and 0-10 ensure that message flow has stopped whilst recovery is processed. The 0-8 sets ChannelFlow=false and waits for the Ok, in 0-10 the consumers are stopped and a sync performed.

Code Problem

The investigation of this problem has highlighted a two areas which need to be addressed:

  1. The ability to ensure the dispatcher is not holding a message.
  2. The ability to confirm when the dispatcher will not process any more messages.

How the Dispatcher holds a message

The _queue.take() call is guaranteed never to return null and once we have entered the take() call there is no way to stop the Dispatcher.

  while (!_closed.get() && ((disp = (Dispatchable) _queue.take()) != null))

Hence we perform the stop as soon as possible after the take(), but this results in us holding on to a message.

Ideally we need to be able to stop the Dispatcher whilst it is in the take() method.

How the Dispatcher can keep processing.

The Dispatcher is currently uses the connecitonStopped() call to suspend its activities when the connection has been marked as stopped. However, we need to know that the Dispatcher has actually hit this section otherwise we need to guarantee that the _queue is empty.

            synchronized (_lock)
            {
                try
                {
                    while (connectionStopped())
                    {
                        _lock.wait();
                    }
                }
                catch (InterruptedException e)
                {
                    // pass
                }

Having the Dispatcher signal that it has stopped processing will allow us to know that we have hit the stopped state. However, this will mean that we have the opportunity to process one extra message AFTER the rollback command has been requested.

Further Details

After a discussion with Rafi/Rob on the recent Python changes expending effort in refactoring the client is probably not worth the effort. If the client message delivery were re-written to mirror the approach taken in the Python codebase then it would be simplier and easier to reason about. As a result I have devised a much smaller, though slightly ugly approach that will address our immediate rollback issues. The approach can be found here .

Comment Responses

User

Comment

via

Response

rhs

AMQSession.syncDispatchQueue is used in 0-10 for this

email

This will not work if the dispatcher is performing the rollback (Deadlock).
Also we need to stop processing the messages immediately and not allow any further processing.

rhs

Agree the client is badly in need of some improvements in maintainability and readability, however in this particular case I don't think moving the rollback processing from one thread to another actually improves the situation significantly.

email

It is not so much moving from on thread to another but from moving from the AMQSession / Dispatcher objects to just the Dispatcher.

rhs

I suspect in order do this properly we really need to stop thinking in terms of code being associated with a given thread, and think instead about what locks we have, what data structures those locks protect, and which locks need to be held in order to execute a given piece of code.

email

The focus of this change was to consolodate the operations on the received messagse. I would like to see a clean interface where messages are passed in for for dispatching. The cleaning operations should then be full contained in that interface not in a couple of locations as it is currently.

rhs

Really we need to be able to articulate exactly what locks the client has, what data structure(s) each lock protects, and what order should be used to acquire multiple locks when necessary.

email

Agreed, documenting what we have and how it works would be very useful for this discussion.

  • No labels