Interface CommunicationSpi
- responsible for sending generic messages MSG between nodes:
Has single implementation - TcpCommunicationSpi
extends CommunicationSpi<Message>
TcpCommunicationSpi
- provides TCP/IP and Java NIO services to send Message
.
sendMessage(ClusterNode, Message, IgniteInClosure<IgniteException> ackC)
with acknowledgment closure.Message
- base class for all Communication messages
GridIoMessageFactory
MessageWriter (DirectMessageWriter#writeHeader).
Message classes are generated with MessageCodeGenerator
. Also here is the HandshakeMessage, HandshakeMessage2
that don't use MessageWriter
.
GridIoManager
-
GridNioServer
- responsible for:
GridSelectorNioSessionImpl
for client worker and remote address. Register it for read/write operations.TcpConfigurationConfiguration - in case of RU multiple nodes must have same configuration settings:
Some of params (userPairedConnections) are written to node attributes and sent by discovery.
Initiator (node0) - node who sends a message to other node.
TCP handshake | SSL handshake | Ignite handshake | Message |
---|
Goal of Ignite handshake:
GridNioServerWrapper#createNioSession:
→ (node0: exchange-worker) TcpCommunicationSpi#sendMessage - send GridDhtPartitionsSingleMessage.
→ Here it creates tcp client to communicate with remote node: creates local Socket, connects it to remote address (blocking).
→ (node1: nio-acceptor-tcp-comm) Accepts new connection, balance it to a client worker by sending SessionChangeRequest
to the balanced client worker (through selector).
→ (node1: grid-nio-worker-tcp-comm) Process the SessionChangeRequest, registers new session GridSelectorNioSessionImpl
(GridNioServer$AbstractNioClientWorker#register).
→ After opening the session, it sends NodeIdMessage on remote node: change session state to REQUIRE_WRITE, then send the message.
→ (node0: exchange-worker) TcpHandshakeExecutor#tcpHandshake: receives NodeIdMessage, verify nodeId, send back HandshakeMessage2 with local nodeId.
→ (node1: grid-nio-worker-tcp-comm) Receives HandshakeMessage2, stores connection info with ConnectionKey. Response with RecoveryLastReceivedMessage.
→ (node0: exchange-worker) Receives acknowledge, validate response code, finish handshake (store ConnectionKey, create GridNioSession, similar process to node1, create client).
→ Ready to use the session to send GridDhtPartitionsSingleMessage
(exchange-worker) TcpCommunicationSpi#sendMessage
→ (exchange-worker) GridNioServer#send: wraps Message to SessionWriteRequest, puts it to session worker queue (grid-nio-worker-tcp-comm).
→ (grid-nio-worker-tcp-comm) GridNioServer$DirectNioClientWorker#processWrite: invoke Message#writeTo to the session write buffer (with Socket#sendBufferSize), write to socket channel.
→ It writes message to buffer until Message#writeTo return true (first byte is Message#directType, then Message#writeTo).
If message is bigger than a session write buffer, then message is send with multiple parts. It's represented as a continue byte stream (no delimeters between parts). Status of sending message is stored in current GridSelectorNioSessionImpl:
(grid-nio-worker-tcp-comm)
→ (grid-nio-worker-tcp-comm) GridNioServer$DirectNioClientWorker#processRead (reads bytes to the session read buffer).
→ (grid-nio-worker-tcp-comm) GridNioCodecFilter#onMessageReceived (decodes message - read Message#directType, then read full message Message#readFrom, until readFrom returns true)
→ Reader = GridDirectMessageReader (singleton instance). Status of reading is stored in the session meta key MSG_META_KEY.
In case of big message, it just wait for next bytes array and read it as continuation of the previous message.
Looks like, fieldsCount is marker when to await less fields than in current version?
Communication protocol consist of 2 parts:
Message
classes, including ser/des algorithm.Messages
to remote node.Introduce Communication Protocol Version
(ProtoVer) that depends on changes in the parts:
Message
. Version is updated along with a commit that introduce a change.Message
).Rules to support compatibility:
Messages
. Only adding new fields is allowed.FeatureMask
).@since
Ignite version.(@since - 1)
version. @since
version and notify release manager to drop support of old versions.ProtoVer
is fixed for Ignite release version. Nodes exchange theirs ProtoVer, FeatureMask
on joining node on discovery:
ProtoVer.Major
is different then FeatureMask
is validated. Let the cluster version V, the joining node version V+2. If the joining node mask contains features since V + 1, then this node must not be join the cluster.ProtoVer.Minor
is differrent then enable Rolling upgrade mode
for communication protocol between nodes with different versions.Stable mode
.Handshake algorithm is extended on new step - validating TcpCommunicationConfiguration consistency. Settings that affects both communicating nodes must be same:
In Stable mode logic of exchanging messages is not changed. Between nodes open channel, messages are written to the channel one by one as byte stream. There is no delimeters between messages, each message starts with direct type. Reader knows reads full message with Message#readFrom
logic.
Message header | Message |
---|---|
2 bytes: direct type | fields |
In Rolling upgrade mode logic is different, because node can't know whether it reads full message - Message#readFrom
can't guarantee that.
Message#readFrom
returns true or fields count is achieved. Then the message is processed.Frame header | Message header | Message | |||
---|---|---|---|---|---|
1 byte: 1bit - continuation bit 2-7bit - reserved | 2 bytes: unsigned frame size (frame size limit 64Kb) | 2 bytes: direct type | 1 byte: fields count (max 255 fields) | fields | trailing zeros |