This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.
This guide describes how to implement custom data streamer, basic concepts and implementation details.
It is useful to take a look at:
SocketStreamer- reference implementation of custom data streamer as TCP socket server.
WordsSocketStreamerClient- example of
The purpose of streamers is data ingesting from various sources (e.g. TCP socket or Kafka) and streaming them into Ignite cache for further processing. Ignite provides
IgniteDataStreamer API that allows stream data into cache and convenient
StreamAdapter that wraps data streamer instance and provides basis for custom streamer implementation.
Data stream consists of messages that can be represented as Java objects of some type that should be converted to cache entries (key-value pairs or tuples).
StreamTupleExtractor is responsible for such conversion.
The following parts of custom data streamer are missing and should be implemented by developer:
From connection initiating stand point data streamer can be implemented as a server or a client with respect of data source and requirements. For example, HTTP data streamer implemented as a client can request the data from external web services. On the other hand, HTTP data streamer implemented as a server can process a large number of request from external HTTP clients.
In order to implement custom data streamer a developer should optionally extend
StreamAdapter class, add functionality related with particular data source and streamer life cycle logic if needed. As mentioned above
IgniteDataStreamer instance and also needs
StreamTupleExtractor instance, that could be provided via constructor or corresponding setters.
StreamAdapater's method is
addMessage(T msg) that just converts message to cache entry using
StreamTupleExtractor and streams entry into cache. This method doesn't block current thread due to the nature of
StreamTupleExtractor interface expose the
extract(T msg) method that returns a cache entry as
Map.Entry<K, V> instance. For example, if message represents a string with two values that separated by '=' symbol where left part is a word and right one - integer number, then extractor could be implemented as:
Streamer life cycle depends on chosen implementation model (server or client) and requirements. Usually streamer can be in one of the following states: created, initialized, shutdown and may be one of transition states.
StreamAdapter doesn't provide any life cycle managements methods. Correct life cycle management implementation is completely responsibility of data streamer developer.
So far as data streamer requires
IgniteDataStreamer instance, Ignite node with a cache should be started and
IgniteDataStreamer should be obtained. Note that
StreamAdapter doesn't manage by
IgniteDataStreamer life cycle.
The following code snippet demonstrates correct data streamer initialization assuming that there is
CustomStreamer implementation with
The following code snippet demonstrates correct data streamer shutdown assuming that there is
CustomStreamer implementation with
SocketStreamer - reference custom data streamer implementation.
SocketStreamer is NIO-based TCP socket server that accepts client connections on configured port, receives data represented as bytes array, converts it to user defined type using
SocketMessageConverter and streams it into cache.
SocketMessageConverter implementation uses standard Java serialization/deserialization mechanism assuming that data stream contains serialized Java objects.
SocketStreamer supports two communication protocols:
SocketStreamer implementation has two life cycle management
stop methods that initialize and shutdown TCP socket server respectively.
The following example demonstrates how
SocketStreamer can be used:
Now socket streamer is ready to receive data. Client for this socket streamer can be implemented as follows:
The following steps should be performed in order to implement custom data streamer: