This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • IO Transport Refactoring
Skip to end of metadata
Go to start of metadata


Shortcomings of the current transport system.

The Java Broker's IO transport (IoNetworkTransport/IoSender/IoReceiver) has a number of shortcomings.  

  1. It uses blocking IO. Messages are pushed onto the wire either synchronously (by another IO thread performing an enqueue) or asynchronously (the QueueRunner).  If the socket is unable to accept messages, then the socket write will block.  This in turn, once the IoSender's buffer becomes full will cause IoSender's send operation to block.   If the socket does not begin to accept data within a fixed time frame, send() throws a SenderTimeoutException.  The handling of this exception is awkward and has been related to a number of defects.  We currently have no better mechanism to deal with this timeout apart from closing the connection.     It would be better if non blocking IO were used and we only attempted to send messages if we knew the socket was able to take some data.
  2. Since IO uses blocking IO, we must use a dedicated thread per connection as we have no way to predict when a socket will block.   The current IO model actually uses two threads per connection.    Users with larger Broker instances will have hundreds/thousands of IO threads.  Each thread requires its own stack-space so such a Broker will have a very heavy memory footprint.     If we were to use non-blocking IO, a thread pool could be utilised, reducing the thread count, thus reducing the memory footprint.
  3. It has two SSL implementations.  It uses SslServerSocket based implementation for Ports configured to use SSL only and uses an SSLEngine based approach when the Port supports both plain and SSL connections (where a sniff is required).
  4. The current IO transport model is seen as impeding progress in a wider queue/message threading refactoring

Desirable characteristics of a new IO transport system.

Phase 1 - Change Broker to use Non-blocking IO 

  1. Non-blocking IO based
  2. Exposes a mechanism so that a caller may determine if the underlying socket can accept data.  This will used by messaging layer to determine if it can send a message. This will ultimately allow TCP/IP back pressure to influence the distribution of messages amongst consumers.
  3. The IO transport's send operation accepts one or more ByteBuffers.
    1. Once a byte buffer is passed to the send method, the transport takes ownership of the ByteBuffer and has responsibility for transferring its entire contents.
    2. The caller may not further alter a byte buffer that has been passed to the transport.
  4. There will be a single IO thread writing/reading for a connection at any one time
    1. Each iteration should:
      1. write as many byte buffers as possible until all pending byte buffers are sent, or the socket signals it is unable to take any more data.  In the latter case, the algorithm must add the IO_WRITE flag to the selector in order that it is notified when the socket can accept more data
      2. read as much as possible until the socket yields no more bytes and pass resulting byte buffers to the receivers
      3. write any new byte buffers resulting from the read data
  5. The IO thread needs to wake up when:
    1. new byte buffers are available to send
    2. the socket indicates that it is ready to accept more data
  6. Remain two thread per connection (one sender, one receiver). 

Update Dec-2014 - above is now committed onto the branch.

Phase 2 - Change Broker to use broker-wide IO thread pool 

  1. The AcceptingThreads remains largely the same
    1. Accepts incoming connections from socket.
    2. Creates the connection
    3. Makes the SocketChannel available to a SelectingThread  (add to map? and bump the selector??)
  2. A SelectingThread owns the selector
    1. on each iteration it registers the selector for all socket channels except those who are already scheduled.
      1. OP_READ flag raised for all.  OP_WRITE needs to be raised if the connection was unable to complete its last write
    2. calls the blocking Selector#select() passing the shortest idle timeout of any connection as the timeout argument
    3. when it returns
      1. create/schedule connection tasks on the io worker pool for each ready channel
      2. if the ready set didn't include our connection we identified with the shortest idle time then we need to schedule it anyway
  3. NonBlockingSenderReceiver
    1. No longer a thread.
    2. Submittable job that write/read/write for a given number of iterations(?).
  4. IoWorkerTheadPool
    1. How big am I?
    2. Who owns me?
    3. How do I get shutdown?

Phase 3 - Change Broker to use have virtual host scope IO thread pools