Goals

  1. Describe current (Ignite 2.17) design of communication protocol.
  2. Propose a way to make it compatible to implement Rolling Upgrade IEP-132.

Classes

Interface CommunicationSpi - responsible for sending generic messages MSG between nodes:

  • void sendMessage(ClusterNode dest, MSG msg)
  • void setListener(CommunicationListener<MSG> lsnr).
  • metrics methods - bytes, messages.

Has single implementation - TcpCommunicationSpi extends CommunicationSpi<Message>

TcpCommunicationSpi - provides TCP/IP and Java NIO services to send Message.

  • dumpStats, dumpNodeStatistics - out of the interface.
  • openChannel, checkConnection, nodeAddresses - looks like can be refactored.
  • Instead interface sendMessage expose sendMessage(ClusterNode, Message, IgniteInClosure<IgniteException> ackC) with acknowledgment closure.
  • Static methods writeMessageType, makeMessageType - should be refactored.

Message - base class for all Communication messages

  • short directType() - Type (class) of the Message. List of types is written down in GridIoMessageFactory
  • byte fieldsCount() - skips in MessageWriter (DirectMessageWriter#writeHeader).
  • boolean writeTo(ByteBuffer, MessageWriter) - writes field by field in buffer using writer, returns true if fully write.
  • boolean readFrom(ByteBuffer, MessageReader) - reads field by field from buffer using reader, returns true if full read.
  • onAckReceived() - is used ONLY for GridDhtAtomicAbstractUpdateRequest for cleanup values (https://issues.apache.org/jira/browse/IGNITE-2466). Must be removed.

Message classes are generated with MessageCodeGenerator. Also here is the HandshakeMessage, HandshakeMessage2 that don't use MessageWriter.

GridIoManager

GridNioServer - responsible for:

  • acceptor - accepts incoming connections and assign it to client worker
  • client workers (grid-nio-worker-tcp-comm) - creates GridSelectorNioSessionImpl for client worker and remote address. Register it for read/write operations.

Send message protocol

TcpConfigurationConfiguration - in case of RU multiple nodes must have same configuration settings:

  • usePairedConnections - use 2 channels (IN and OUT) between nodes.
  • connectionsPerNode - amount of connections between 2 nodes.
  • connectionRequester - special protocol for requesting connection of remote node through discovery.

Some of params (userPairedConnections) are written to node attributes and sent by discovery.

Handshake

Initiator (node0) - node who sends a message to other node.

TCP handshakeSSL handshakeIgnite handshakeMessage
  1. node0 initiates handshake.
  2. node1 sends NodeIdMessage (if initialized) or HandshakeWaitMessage (if not initialized yet)
  3. if node0 received HandshakeWaitMessage, it will retry handshake after timeout
  4. if node0 received NodeIdMessage, it sends HandshakeMessage2 (locNodeId, connectCount, messageReceivedCount, connectionIdx)
    1. connectCount - Counter of connections with same connIdx. Receiver should remove sessions with counter less than received.
    2. recvCount - For aligning messages counters on nodes.
    3. connIdx - index of connectio n. Depends on connectionsPerNode. If connectionsPerNode is 1, then connIdx always equals to 0.
  5. node1 responses with RecoveryLastReceivedMessage (rcvCount - used for status and messages counter).

Goal of Ignite handshake:

  1. Remote node is initialized and ready to accept incoming messages.
  2. Remote node responses with node ID. Initiator node validates this ID - received ID is known and is part of topology.
  3. Remote node in consistent state with local node - amount of send and ack messages (in case of reconnect?).
  4. [For RU?] TcpConfiguration hashCode or some params (see above)? TcpCommunication protocol version (IgniteFeatures)?

Step by step handshake

GridNioServerWrapper#createNioSession:

→ (node0: exchange-worker) TcpCommunicationSpi#sendMessage - send GridDhtPartitionsSingleMessage. 

→ Here it creates tcp client to communicate with remote node: creates local Socket, connects it to remote address (blocking).

→ (node1: nio-acceptor-tcp-comm) Accepts new connection, balance it to a client worker by sending SessionChangeRequest to the balanced client worker (through selector).

→ (node1: grid-nio-worker-tcp-comm) Process the SessionChangeRequest, registers new session GridSelectorNioSessionImpl (GridNioServer$AbstractNioClientWorker#register).

→ After opening the session, it sends NodeIdMessage on remote node: change session state to REQUIRE_WRITE, then send the message.

→ (node0: exchange-worker) TcpHandshakeExecutor#tcpHandshake: receives NodeIdMessage, verify nodeId, send back HandshakeMessage2 with local nodeId.

→ (node1: grid-nio-worker-tcp-comm) Receives HandshakeMessage2, stores connection info with ConnectionKey. Response with RecoveryLastReceivedMessage.

→ (node0: exchange-worker) Receives acknowledge, validate response code, finish handshake (store ConnectionKey, create GridNioSession, similar process to node1, create client).

→ Ready to use the session to send GridDhtPartitionsSingleMessage

Write part

(exchange-worker) TcpCommunicationSpi#sendMessage

→ (exchange-worker) GridNioServer#send: wraps Message to SessionWriteRequest, puts it to session worker queue (grid-nio-worker-tcp-comm).

→ (grid-nio-worker-tcp-comm) GridNioServer$DirectNioClientWorker#processWrite: invoke Message#writeTo to the session write buffer (with Socket#sendBufferSize), write to socket channel.

→ It writes message to buffer until Message#writeTo return true (first byte is Message#directType, then Message#writeTo).

If message is bigger than a session write buffer, then message is send with multiple parts. It's represented as a continue byte stream (no delimeters between parts). Status of sending message is stored in current GridSelectorNioSessionImpl:

  • GridNioSessionImpl#meta(NIO_OPERATION) is re-filled with SessionWriteRequest
  • GridNioSessionImpl#meta(MSG_WRITER) stores state of writing (field index).

Read part

(grid-nio-worker-tcp-comm)

→ (grid-nio-worker-tcp-comm) GridNioServer$DirectNioClientWorker#processRead (reads bytes to the session read buffer).

→ (grid-nio-worker-tcp-comm) GridNioCodecFilter#onMessageReceived (decodes message - read Message#directType, then read full message Message#readFrom, until readFrom returns true)

→ Reader = GridDirectMessageReader (singleton instance). Status of reading is stored in the session meta key MSG_META_KEY.

  • If readFrom returns true, then Message returns and proceed.
  • If readFrom returns false, then store Message in meta. If it's expected more fields then it hangs and throw exception (wrong message type).
  • If readFrom returns true and it's expected less fields, it hangs and throws exception (assertion error).

In case of big message, it just wait for next bytes array and read it as continuation of the previous message.

Looks like, fieldsCount is marker when to await less fields than in current version?

Compatibility for Communication protocol

Communication protocol consist of 2 parts:

  1. Data - set of declared Message classes, including ser/des algorithm.
  2. Transport - algorithm of transport the Messages to remote node.

Introduce Communication Protocol Version (ProtoVer) that depends on changes in the parts:

  • Version is Major.Minor, initial 1.0.
  • Minor version is upgraded by adding new fields in any Message. Version is updated along with a commit that introduce a change.
  • Major version is upgraded by breaking changes in protocol (changes in handshake or ser/des algorithm, adding new Message).

Rules to support compatibility:

  1. It's prohibited to remove fields, change types of fields in Messages. Only adding new fields is allowed.
  2. Breaking changes are introduced as communication features (FeatureMask).
    1. Every communication feature must be declared with @since Ignite version.
    2. Ignite release must disable a communication feature with Ignite node with (@since - 1) version. 
    3. On release Ignite should check the @since version and notify release manager to drop support of old versions.

ProtoVer is fixed for Ignite release version. Nodes exchange theirs ProtoVer, FeatureMask on joining node on discovery:

  1. If ProtoVer.Major is different then FeatureMask is validated. Let the cluster version V, the joining node version V+2. If the joining node mask contains features since V + 1, then this node must not be join the cluster.
  2. If ProtoVer.Minor is differrent then enable Rolling upgrade mode for communication protocol between nodes with different versions.
  3. Otherwise communication protocol runs in Stable mode.

Communication handshake

Handshake algorithm is extended on new step - validating TcpCommunicationConfiguration consistency. Settings that affects both communicating nodes must be same:

  1. usePairedConnections
  2. connectionsPerNode

Stable mode

In Stable mode logic of exchanging messages is not changed. Between nodes open channel, messages are written to the channel one by one as byte stream. There is no delimeters between messages, each message starts with direct type. Reader knows reads full message with Message#readFrom logic.

Message headerMessage

2 bytes: direct type

fields

Rolling upgrade mode 

In Rolling upgrade mode logic is different, because node can't know whether it reads full message - Message#readFrom can't guarantee that.

  • Sender splits every message on frames. One frame cannot contain more than one message.
  • Frame header contains: 
    • continuation bit - flag that shows whether this frame is a part of previous message, or start of new message.
    • frame size - can be default for varlen messages, or defined by Message class for fixed sized messages.
  • Message header contains direct type and fields count:
    • Let receiver expects N fields, but sender sends (N-1) fields. This field protect of reading random bytes to the Nth field.
  • After message is fully written, writer fills remaining frame buffer with zeros. 
  • Reader reads message from frames util Message#readFrom returns true or fields count is achieved. Then the message is processed.
    • After that, reader reads to /dev/null until continuation bit is unset.
Frame headerMessage headerMessage

1 byte:

1bit - continuation bit

2-7bit - reserved

2 bytes:

unsigned frame size

(frame size limit 64Kb)

2 bytes:

direct type

1 byte:

fields count

(max 255 fields)

fieldstrailing zeros


  • No labels