Geode uses UDP messaging through JGroups for its peer to peer (P2P) messaging related to membership. Unfortunately JGroups does not support rolling upgrades, which makes it more difficult to upgrade JGroups. Configuring encryption for UDP adds an additional layer of complexity for securing P2P communications.
We would like to replace the UDP messaging for membership with a TCP-based messaging system.
- Set us up for removing JGroups as dependency in future versions by moving to a protocol that does not require JGroups
- Support rolling upgrades from the old JGroups protocol to the new TCP-based protocol
- Use the existing SSL settings to control how membership messages are encrypted
- Be as reliable in the face of networking failures as the previous protocol
All of messaging related to membership is handled by the JGroupsMessenger class, which implements the Messenger interface. We will create a new implementation of Messenger that uses TCP sockets, rather than JGroups and UDP sockets.
The key methods on messenger are below:
The start() method of the messenger is responsible for starting a server that is listening on a socket. The getMemberId() method returns an InternalDistributedMember. Currently the InternalDistributedMember contains the host and port that the Messenger is listening on in its getHost() and getPort() methods.
The send() method takes a DistributionMessage. DistributionMessage has a getRecipients() method, which returns an array of InternalDistributedMember objects. The messenger can send messages to those recipients because they have the host and port information.
The installView() method is used by the messenger to determine which members are no longer in the view, and so we should stop trying to transmit messages to them.
Discovery of the view happens outside the Messenger in GMSJoinLeave. We won't have to modify that functionality. The new messenger just needs to know how to start a TCP server, generate an InternalDistributedMember that contains the contact information for that server, and use InternalDistributedMember objects from other other members to send messages to them.
Implement a TCP Server Using Netty
Our proposal for the new Messenger is to start a TCP server using Netty. We will implement a Netty ChannelInboundAdapter that deserializes Geode messages and passes them to handlers registered with addHandler(). The host and port of the Netty server can be included in InternalDistributedMember or shared other ways. (See the Rolling Upgrade section, below.)
The Netty server will use the existing cluster SSL configuration. So if cluster SSL is enabled no additional properties will be required. See https://geode.apache.org/docs/guide/latest/managing/security/implementing_ssl.html for information on the relevant properties.
When sending a message the Messenger will create a connection to all destinations if no connection exists yet. Once a connection is established, the connection will remain open as long as that member is still in the view.
Handling TCP connection failures
The contract of the Messenger is that it keeps trying to deliver messages to the destination as long as those destinations are still in the view. Because individual TCP connections can fail, this basically forces us to implement a reliability layer above TCP that will continue to retry messages until a member is removed from the view. This layer needs to be able to:
- Reestablish a connection to the destination to if the existing TCP connection fails.
- Retransmit messages that may not have been received. Because more than one message can be in the TCP send buffer of the sender when a TCP connection fails, we need to retransmit some window of messages.
- Because we are retransmitting messages, we need include some sort of sequence number to prevent duplicate messages, and possibly also to deliver messages in order.
- Implement something akin to a TCP sliding window, with an increasing sequence number attached to each message and acknowledged by the receiver.
- Implement something like JGroups UNICAST3 protocol? https://github.com/belaban/JGroups/blob/master/doc/design/UNICAST3.txt. This is what the current membership layer is using.
One concern about switching to a TCP-based protocol is that network outages may result in TCP sockets hanging on read or write operations. We need to ensure that, if a connection to one member is blocked, we still send messages to the other members. Each destination will have its own queue of messages to be sent, and adding to one queue should never block.
Rolling upgrade concerns
We need to be able to do a rolling upgrade from the old JGroups-based protocol to the new protocol. We will need to continue to support rolling upgrades for a certain range of versions before we can drop JGroups.
In order to accomplish this a member will actually need to be listening for connections on both protocols when it initially starts up. We will create a delegating Messenger that contains both a JGroupsMessenger and a NettyMessenger. It can install handlers in both of them, and decide which Messenger to use when sending a message based on the version of the recipient. If a member receives a view that contains no old members that don't support the old protocol it could shut down the JGroups-based Messenger.
There is an issue here with the need to listen on two separate ports, because InternalDistributedMember currently only has support for a single port field. There are a few options we are evaluating:
- Just use the same port for JGroups and Netty. Since one is a UDP port and the other is TCP port, they can both be open at the same time.
- Encode the second port in some other field in InternalDistributedMember, for example by using part of the UUID bytes. This is kind of hacky.
- Pass the new membership port around outside of InternalDistributedMember. This would probably involve sending as part of the FindCoordinatorResponse, as well as including it in the NetView.