Edgent comes with a number of connectors for interfacing with Messaging systems, Databases, Files, Processes, etc. See the "Edgent Connectors" section. Of course there will always be a need for connectors to new sources and sinks of data.
From Edgent's perspective a "connector" is just a fancy name for some code that can do one or both of:
- act as a sink for a stream - when provided with a stream tuple it can deliver that data to an external entity. The connector implements Consumer<T>.apply(T tuple).
- act as a source for a stream - when queried, or on its own, can synthesize a tuple from data from an external entity. e.g., the connector implements T Supplier<T>.get()
The connector doesn't need to know how to get a tuple from a stream or how to add a tuple to a stream. Okay, a source connector for use with Topology.events() is given a Consumer<T> object and whenever the connector has a new tuple to add to a stream it simply calls that object's apply(T tuple) method.
The Edgent runtime has no particular knowledge of a "connector". Edgent doesn’t mandate a connector’s API though some idioms have developed. Edgent doesn’t require a particular implementation approach. Hence there is no single recipe for creating a connector.
Edgent is structured in a "micro services" manner enabling more connectors to be added to it as needed - simply by including the connector code to the source repository. Of course, a connector need not be "added to Edgent" to be used in topologies.
After reading the following, the best approach is likely to look at / start with one of the existing connectors that seems to have characteristics similar to your connector's needs.
A connector for the "console": write string tuples to it, read string tuples from it.
This demonstrates the interfaces and code required to be usable as a stream sink and stream source are trivial. Ultimately all of the complexity of a connector is its internal code for interfacing with the external entity... which is also trivial in this case.
A connector to retrieve/receive data from an external entity and convert it into a tuple for a stream need only support being used as an argument to one of Topology.source(), generate(), poll(), or events() – by implementing the appropriate simple functional interface. E.g., If used with poll(), the connector will be called periodically via its Supplier<T>.get() implementation to return the next tuple to add to the stream. How the connector comes up with that tuple is its business. The connector is responsible for converting the data from the external entity to the Stream's tuple type. A connector typically supports only a small number of tuple types such as String, JsonObject and/or byte.
A connector to supply stream tuples to an external entity need only support being used as an argument to TStream.sink() - implementing Consumer<T>. The connector is called with a tuple via its Consumer.apply(T tuple) implementation and it is its business how it handles the tuple to "deliver" it to the external entity. Again, the connector typically supports only a small number of tuple types such as String, JsonObject and/or byte, converting it as appropriate for the external entity.
Quality of service guarantees for tuple delivery (by a sink operation) is a connector design choice. It is likely influenced by an underlying “communication” library used by the connector. e.g., the MQTT connector exposes the underlying MQTT protocol’s Quality Of Service feature, leveraging the Eclipse Paho MqttClient library’s support for it.
Connector API design
There are two main connector API styles. Whichever style is chosen, strive to make the common / simple simple for clients of the connector and strive to keep the connector as simple as possible. An example of this thinking is that connectors typically support only a small number of tuple types such as String, JsonObject and/or byte. While JsonObject is logically optional (the user could simply convert a JSON string to a JsonObject), its use is so common that directly support it made sense.
Source/Sink function API
The connector defines one or more classes to be used to generate, or used as, functions for TStream.sink() or one of the Topology source stream methods: source(), generate(), events(), poll().
Which form of Topology source operation to support is chosen based on other characteristics of the connector's implementation. If the underlying connector implementation has its own thread/callback mechanism for receiving data, the connector should be designed to work with Topology.events(). Otherwise the connector would chose to be usable by one of Topology.source(), generate(), and/or poll() based on what styles of use it wants to support.
Encapsulated Source/Sink use
The connector encapsulates the above and provides the user with higher level source/sink related operations.
The current Edgent connectors tend to follow this model for their API and then use the "Source/Sink function" idiom for their “runtime” implementation.
The API for most Edgent connectors is a <kind>Streams class that exposes both source and sink related methods (e.g., MqttStreams, FileStreams, HttpStreams, JdbcStreams). Some connectors have separate classes for source vs sink facilities (e.g., KafkaProducer, KafkaConsumer). In the case of the Kafka connector, separate classes were chosen because the connector's configuration parameters were very different for the producer vs consumer and combining it into a single class just complicated things.
Whether the connector’s API exposes static source/sink related methods (e.g., FileStreams) or requires the client to instantiate an instance of the connector (e.g., MqttStreams) is a connector design choice.
The “connector instance” idiom is typically used when the underlying connector implementation supports multiple source and/or sink operations, and resource sharing, using a single “connection”. The client of the connector can instantiate connector instances as needed to control the scope of use of an underlying “connection” (e.g., creating per-source-stream connector/connection instances), should that be valuable usage capability.
As noted above, there isn’t much defined in the way of lifecycle and/or common management capabilities required of the connector.
Other than when using/supporting Topology.events() and receiving an initializing Consumer<Consumer<T>>.accept(Consumer<T> receiver) call, a “functional level” connector implementation receives no explicit initialization. It seems likely that at some point, Edgent will define an optional functional level initialization interface, then enabling function implementations to receive initialization/context info similar to that received by and Edgent Oplet implementation. Of course, a connector implementation is free to implement itself as an Oplet. See the FileStreams’s TextFileReader runtime class for a sample that is based on a (Pipe) Oplet.
It’s possible that at some point Edgent may provide more general connector control interfaces/classes to enable uses like “connect for 10 minutes every 4 hours” and/or define/provide services for queuing/persisting sink tuples while “offline”.
A connector that wants to be notified upon Job shutdown, e.g., to close a connection, needs to implement java.lang.AutoClosable. This is documented in the Topology source related methods and TStream.sink().
The connector implementation can utilize any java facilities or 3rd party libraries it deems useful, e.g., it can allocate/use threads; the MQTT connector uses the Eclipse Paho MqttClient library.
The connector implementation is responsible for managing any underlying resources such as “connections” (e.g., a connection to a MQTT server), including initial connection establishment, reestablishing connections that may intermittently fail, and optionally implementing an “Idle connection shutdown” stragegy. How much of this responsibility ends up in the connector developer’s lap depends on capabilities and usage model of an underlying “communication” library used by the connector, if any.
Edgent provides a base Connector class to assist connector implementations with managing connections. Its use is entirely optional. See its javadoc for more information. See the Mqtt connector implementation for an example of its use.
Edgent uses slf4j for trace and logging. Your connector should too. E.g. from the Mqtt connector:
A common connector runtime implementation pattern is to have a <kind>Connector class (e.g., MqttConnector) and separate source (MqttSubscriber) and sink (MqttPublisher) related classes. The "source" class implements Supplier<T> (or one of the other Topology source stream method functions) and the "sink" implements Consumer<T>. These are used by the <kind>Streams connector API class (e.g., MqttStreams).
The sink related method MqttStreams.publish(...) is simply:
The stream source related method MqttStreams.subscribe() is simply:
Connector Code partitioning
By convention this package structure is used by Edgent supplied connectors:
- org.apache.edgent.connectors.<kind> connector API
- org.apache.edgent.connectors.<kind>.runtime connector implementation
- org.apache.edgent.test.connectors.<kind> connector tests
This convention is optional unless the intent is to create a connector to be added to the Edgent code base.