Overview

This guide describes how to implement custom data streamer, basic concepts and implementation details.

It is useful to take a look at:

Basic Concepts

The purpose of streamers is data ingesting from various sources (e.g. TCP socket or Kafka) and streaming them into Ignite cache for further processing. Ignite provides IgniteDataStreamer API that allows stream data into cache and convenient StreamAdapter that wraps data streamer instance and provides basis for custom streamer implementation.

Data stream consists of messages that can be represented as Java objects of some type that should be converted to cache entries (key-value pairs or tuples). StreamTupleExtractor is responsible for such conversion.

The following parts of custom data streamer are missing and should be implemented by developer:

  • conversion of stream specific messages to Java object (e.g. byte array to String);
  • stream wrapper that encapsulates functionality related with particular data source.

From connection initiating stand point data streamer can be implemented as a server or a client with respect of data source and requirements. For example, HTTP data streamer implemented as a client can request the data from external web services. On the other hand, HTTP data streamer implemented as a server can process a large number of request from external HTTP clients.

In order to implement custom data streamer a developer should optionally extend StreamAdapter class, add functionality related with particular data source and streamer life cycle logic if needed. As mentioned above StreamAdapter wraps IgniteDataStreamer instance and also needs StreamTupleExtractor instance, that could be provided via constructor or corresponding setters.

The central StreamAdapater's method is addMessage(T msg) that just converts message to cache entry using StreamTupleExtractor and streams entry into cache. This method doesn't block current thread due to the nature of IgniteDataStreamer.

StreamTupleExtractor interface expose the extract(T msg) method that returns a cache entry as Map.Entry<K, V> instance. For example, if message represents a string with two values that separated by '=' symbol where left part is a word and right one - integer number, then extractor could be implemented as:

 

public class WordCountTupleExtractor implements StreamTupleExtractor<String, String, Integer> {

	 @Override public Map.Entry<String, Integer> extract(String msg) {
        String[] tokens = msg.split("=");

 		return new IgniteBiTuple<>(tokens[0], Integer.valueOf(tokens[1]));
    }
}

Streamer Life Cycle

Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states. StreamAdapter doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.

So far as data streamer requires IgniteDataStreamer instance, Ignite node with a cache should be started and IgniteDataStreamer should be obtained. Note that StreamAdapter doesn't manage by IgniteDataStreamer life cycle.

The following code snippet demonstrates correct data streamer initialization assuming that there is CustomStreamer implementation with start method:

 

// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();

// Create and start cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache with name "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Create custom data streamer implementation.
CustomStreamer customStmr = new CustomStreamer(stmr, new WordCountTupleExtractor());

// Start custom data streamer if it provides corresponding methods.
customStmr.start();

The following code snippet demonstrates correct data streamer shutdown assuming that there is CustomStreamer implementation with stop method:

// Stop custom data streamer. NOTE: IgniteDataStreamer instance and Ignite node are still in running state.
customStmr.stop();

// Stop data streamer and release all associated resources.
stmr.close();

// Stop Ignite node.
ignite.close();

Reference Implementation

Ignite provides SocketStreamer - reference custom data streamer implementation. SocketStreamer is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using SocketMessageConverter and streams it into cache.

The default SocketMessageConverter implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.

SocketStreamer supports two communication protocols:

  • message size based protocol (default) where each message in the stream is prepended by 4-byte integer header containing message size;
  • message delimiter based protocol where each message in the stream is appended with user defined delimiter (see SocketStreamer.setDelimiter(byte[] delim) method).

SocketStreamer implementation has two life cycle management start and stop methods that initialize and shutdown TCP socket server respectively.

The following example demonstrates how SocketStreamer can be used:

// Start Ignite node with default configuration.
Ignite ignite = Ignition.start();

// Create cache with default configuration and name "wordsCache".
IgniteCache<String, Integer> cache = ignite.getOrCreateCache("wordsCache");

// Get data streamer reference for cache "wordsCache".
IgniteDataStreamer<String, Integer> stmr = ignite.dataStreamer("wordsCache");

// Configure socket streamer.
SocketStreamer<String, String, Integer> sockStmr = new SocketStreamer<>();

// Set socket server address.
sockStmr.setAddr(InetAddress.getLocalHost());

// Set socket server port.
sockStmr.setPort(5555);

// Use message delimiter based protocol where delimiter is zero byte.
sockStmr.setDelimiter(new byte[] {0});

// Set ignite instance
sockStmr.setIgnite(ignite);

// Set data streamer instance
sockStmr.setStreamer(stmr);

// Converter from zero-terminated string to Java string.
sockStmr.setConverter(new SocketMessageConverter<String>() {
    @Override public String convert(byte[] msg) {
        return new String(msg);
    }
});

// Set tuple extractor (see above).
sockStmr.setTupleExtractor(new WordCountTupleExtractor());

// Start socket streamer.
sockStmr.start();

Now socket streamer is ready to receive data. Client for this socket streamer can be implemented as follows:

byte[] delim = new byte[] {0};

try (
    // Connect to server
    Socket sock = new Socket(InetAddress.getLocalHost(), 5555);
    OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
) {
    for (String word : words) {
		String item = word + "=1";

		byte[] arr = item.getBytes();

		// Write message.
		oos.write(arr);

		// Write delimiter.
		oos.write(delim);
    }
}

Implementation Steps

The following steps should be performed in order to implement custom data streamer:

  1. Chose model that data streamer will be implement (server or client).
  2. Define new data streamer class that extends StreamAdapter.
  3. Provide API for data streamer configuration and implement parameters validation if needed. Hosts, ports, other data source specific parameters.
  4. Implement life cycle management methods if needed. Usually developer will want to have control over data source resources such as JMS sessions, HTTP/socket connections, Kafka consumers and so on.
  5. Implement logic of data consuming from data source. For example: JMS message processing in corresponding listener, accepting client connections and reading data from it, consuming messages from Kafka topics and so on.
  6. Implement data conversion from data source specific format to Java object. For example: JMS Message instances conversion, conversion of bytes received from socket, conversion of bytes consumed from Kafka topics and so on.
  7. Provide abstraction and API for user defined data converters. For example: JMS defines several message types (TextMessage, MapMessage), socket servers can use different protocols and so on.

 

  • No labels