This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • Message API Design
Skip to end of metadata
Go to start of metadata

Message API Design

This document describes the new message API for the restructured client.

  • Sending Messages
  • Receiving Messages
  • Message abstraction
  • Java Doc

Sending Messages

The Session class provides the following methods to send messages.

public interface Session{
.........

//Option1 - for small messages
public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)throws IOException;

//Option2 - for large messages
public void messageStream(String destination, Message msg, short confirmMode, short acquireMode)throws IOException;
 
//Option3 - can use it with any message size, recomended for large messages
public void messageTransfer(String destination, short confirmMode, short acquireMode);
public void headers(Struct... headers);
public void data(byte[] data);
public void data(ByteBuffer buf);
public void data(String str);
public void endData();

.........

}

Sending small Messages

Option1 provides a convinience method to send small messages.
You could use the ByteBufferMessage to create small in memory messages (or your own implementation).
Underneath it maps onto methods defined under option3

Sending large Messages

You have two options for sending large messages, using either pull style or push style semantics

Using the Session class methods (Option3)

Option3 provides a more natural AMQP style set of methods
You can stream data using Option3 by pushing your data using one of the data methods defined in the session class.

Using Option2 (pull style)

The messageStream method will pull data from the message and stream using the methods defined in option3.
You could use FileMessage or StreamingMessage or your own Message implementation that backs a large data stream.

  • FileMessage takes in a FileInputStream and create a nio FileChannel. It then uses a MappedByteBuffer to map a region of the file when the readData method is invoked. You could specify a chunksize in the constructor to control how much data is mapped each time.
  • StreamingMessage takes in a SocketChannel and reads a chunk of data at a time until the SocketChannel is closed. This could be useful when u need to transfer a data stream received from a legacy application or a hardware device. In such cases the StreamingMessage provides a convinient abstraction to stream the data without any intermediate copying.

Receiving Messages

To receive messages you can subscribe using the following method

public interface Session{
.........

public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
                                 MessagePartListener listener, Map<String, ?> filter, Option... options);
-----
}

The API provides support for receiving messages in parts as and when they arrive using the MessagePartListener.
This enables the user to start consuming the message while it is being streamed.

public interface MessagePartListener{

public void messageTransfer(long transferId);

public void messageHeaders(Struct... headers);

public void data(ByteBuffer src);

public void messageReceived();

}

The messageTransfer method signals the start of a transfer and passes the transferId.
The Transfer Id is used for the following operations defined in the Session API.

  • to Acquire the message (if the message was transfered in no-acquire mode)
  • to release the message ( if already acquired)
  • to Reject or Acknowledge the message

The data method will be called each time Frame arrives. The messageReceived method will signal the end of the message.

Consuming small messages

The API also provides a convinient way for consuming small messages through the MessageListener interface and the MessagePartListenerAdapter.
The MessagePartListenerAdapter will build the message and will notify the user through MessageListener when the message is complete.

public interface MessageListener{
 
 public void onMessage(Message message);  

}

you can use it the following way.

.........

 MessageListener myMessageListener .... 

 session.messageSubscribe(....,new MessagePartListenerDapter(myMessageListener),...);
-----

Message abstraction

Message Interface provides an abstraction for creating messages from different data streams.
Please read the java doc for a complete description of each method.

public interface Message{

public MessageProperties getMessageProperties();
public DeliveryProperties getDeliveryProperties();

public void appendData(byte[] src) throws IOException;
public void appendData(ByteBuffer src) throws IOException;

public void readData(byte[] target) throws IOException;   
public ByteBuffer readData() throws IOException;

public void clearData();
public long getMessageTransferId();

}
  • No labels