Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

IDIEP-23
Author
Sponsor
Created18 Jun 2018
Status

Status
colourGreen
titleCOMPLETED


Table of Contents


Motivation

Currently, there is an issue with cache operations latency in thin clients, which also can result in throughput issues when using synchronous operations. It's caused by the additional network hop from client to server, as thin client currently is not able to determine physical location of the data, so it sends all cache requests to random server, which re-routes data to the right server.

The proposed solution is to implement "Affinity Awareness" for our thin clients, so they will send cache requests to the server, contain the data right away. With this idea in mind, we have potential to improve mean latency when using thin clients dramatically

Description

Here you can find description on how the solution can be implemented.

Connection

  1. On thin client startup it connects to all nodes provided by user by client configuration.
  2. Upon handshake server returns its UUID to client.
  3. By the end of the startup procedure, client have open connections to all available server nodes and the following mapping (nodeMap): [UUID => Connection]. 

Connection to all nodes helps to identify available nodes, but can lead to significant delay, when thin client is used on a large cluster with a long IP list provided by user. To lower this delay, asynchronous establishment of connections can be used.

Changes in format of handshake protocol message

The format of the handshake messages can be found here. Only successful response message is changed.

...

Field typeField description
intSuccess message length, 1.
byte

Success flag, 1.

UUIDUUID of the server node.

Connection algorithm written in pseudo-code

Code Block
languagejava
titlePseudo code
linenumberstrue
Map<UUID, TcpConnection> connect(ClientConfig cfg)
{
	Map<UUID, TcpConnection> nodeMap = new Map();

	// Synchronous case here, as it is easier to read, but the same operation
    // can be performed asynchronously.
	for (addr: cfg.GetAddresses()) {
		TcpConnection conn = establishConnection(addr);

		if (conn.isGood()) {
			HandshakeResult handshakeRes = handshake(conn);

			if (handshakeRes.isOk()) {
				UUID nodeUuid = handshakeRes.nodeUuid();

				if (nodeMap.contains(nodeUuid)) {
					// This can happen if the same node has several IPs.
					// It have sense to keep more fresh connection alive.
					nodeMap.get(nodeUuid).disconnect();
				}

				nodeMap.put(nodeUuid, conn);
			} else {
				conn.disconnect();
			}
		}
	}

	if (nodeMap.isEmpty()) {
		// This is the only case which leads to the failure of the whole operation.
		// If at least one connection has been established, we can work with the cluster.
		reportFailure("Can not establish connection to a cluster);
	}


	return nodeMap;
}

Cache affinity mapping acquiring

To be able to route data to the primary node for the key, client should know partition mapping for a cache. There are several possible cases when client may want to request an affinity mapping for a cache or several caches, so it makes sense to add possibility to request affinity mapping for a several caches in one request. Also, partition mappings for several caches are often the same, so as an optimization, it makes sense to include in response with partition mapping a list of caches for which it applies. Thus the partitions request can be described by the following steps:

...

See proposed Cache Partitions Request and Response message format below.

Cache Partitions Request

Field typeDescription
Header

Request header. Format details can be found here.

intNumber of caches N to get partition mappings for.
intCache ID #1
intCache ID #2
......
intCache ID #N

Cache Partitions Response

Field typeDescription
Header

Response header. Format details can be found here.

longTopology Affinity Version.
intMinor Topology Affinity Version.
intNumber of cache mappings J, that describe all the caches listed in request.

Partition Mapping

Partition mapping #1. [[cacheId] => [nodeUuid => partition]]. See format below.

Partition Mapping

Partition mapping #2
......

Partition Mapping

Partition mapping #J

Partition Mapping

Field typeDescription
boolApplicable. Flag that shows, whether standard affinity is used for caches.
intNumber K of caches for which this mapping is applicable
intCache ID #1

Cache key configuration

Key configuration for cache #1. Present only if Applicable is true.
intCache ID #2
Cache key configurationKey configuration for cache #2. Present only if Applicable is true.
......
intCache ID #K
Cache key configurationKey configuration for cache #K. Present only if Applicable is true.
intNumber L of nodes. Present only if Applicable is true.
Node Partitions
Partitions of the node #1.Present only if Applicable is true.
Node Partitions
Partitions of the node #2.Present only if Applicable is true.
......
Node PartitionsPartitions of the node #L.Present only if Applicable is true.

Cache key configuration

Field typeDescription
intNumber R of key configurations
intKey type ID #1
intAffinity Key Field ID #1
intKey type ID #2
intAffinity Key Field ID #2
......
intKey type ID #R
intAffinity Key Field ID #R

Node Partitions

Field typeDescription
UUIDUUID of the node
intNumber of partitions M associated with node
intPartition #1 for node.
intPartition #2 for node.
......
intPartition #M for node.

Changes to cache operations with single key

When user makes key-based cache operation, thin client makes the best effort to send the request to the node, which stores the data.

  1. Client determines partitionMap for the Cache using cacheId and distributionMapdistributionMap(cacheId) => partitionMap. For details about partitionMap and distributionMap see section Cache instance acquiring.
  2. If there is no partitionMap for the cache, it means a feature is not applicable for the Cache. Go to the step 6.
  3. Once given a key by a user, client checks its cacheKeyMap (see section Cache instance acquiring 4.a) to find out, if cache configured to calculate partition from the key, using a specific key field.
  4. Client uses a whole key or affinity key field if present with its internal implementation of rendezvous affinity function to calculate a partition for the key.
  5. Using partitionMap , client gets a nodeUuid of the primary node for the key.
  6. Using the nodeMap described in section Connection, client checks, whether there is an active connection nodeConnection to the required node, associated with nodeUuid.
  7. If there is no connection to the primary node for the key, or if it can not be determined, client gets random nodeConnection from nodeMap.
  8. Connection nodeConnection used to make request.
  9. If the error happens on request send, use other random connection to send request. Used connection should be excluded from the nodeMap. This is not an error for user, though a warning log message may be a good idea in this case.
  10. If no nodes are left, report an error to a user.

Request Sending Algorithm in pseudo code

Code Block
languagejava
titlePseudo code
linenumberstrue
Response sendRequest(CacheKey key, Message msg)
{
	UUID nodeUuid = null;
	Connection nodeConnection = null;

	if (!distributionMap.contains(cacheId))
		updateCachePartitions(cacheId); // See "Cache instance acquiring"

	PartitionMap partitionMap = distributionMap.get(cacheId);

	if (!partitionMap.empty()) {
		Object affinityKey = key;

		Map<int, int> keyAffinityMap = cacheKeyMap.get(cacheId);
		int affinityKeyId = keyAffinityMap.get(key.typeId());

		if (affinityKeyId != null)
			affinityKey = key.getFieldById(affinityKeyId)

		int partition = RendezvousAffinityFunction(affinityKey);

		nodeUuid = partitionMap.get(partition);

		nodeConnection = nodeMap.get(nodeUuid);
	}

	if (nodeConnection == null)
		nodeUuid, nodeConnection = nodeMap.getRandom();

	while (true) {
		try {
			Response rsp = nodeConnection.send(msg);

			return rsp;
		}
		catch(err) {
			logWarning(err);

			nodeConnection.disconnect();

			nodeMap.remove(nodeUuid);

			if (nodeMap.isEmpty())
				reportErrorToUser("Cluster is unavailable");

			nodeUuid, nodeConnection = nodeMap.getRandom();
		}
	}
}

Updating partition mapping

It is important for client to keep partition mapping updated. To ensure this, the following changes are proposed:

...

  1. Updates distributionMap and partitionMap (preferably asynchronously) for all active caches. It also may be done "on demand" - on the first call to the Cache instance;
  2. Tries to establish connection to hosts, which is not yet connected. This is required as changes of the topology may be caused by the new node joining the cluster.

Standard response message header changes

The format of the standard response messages can be foundhere.

...

As shown above, it is proposed to add new "Flags" field, to reduce size of the success response message (the most common case). 

Benchmarks

Benchmarks that were performed on thin clients show good performance improvements for 3-nodes use case of C++ thin client with prototype implementation.

View file
namethin_benches_v8.pdf
height250

Risks and Assumptions

  1. The proposed algorithm can introduce significant delay on start up, when thin client is used on a large cluster with a long IP list provided by user. To lower this delay, asynchronous establishment of connections can be used.
  2. There may be a need to limit maximum number of open connection to use the feature on large clusters. However, this will reduce the efficiency of the proposed enhancement for large clusters.
  3. Enhancement does not improve performance on a cluster with non-standard affinity function. However it does not reduce the performance in this case either.
  4. The feature was not yet tested with large clusters.

Discussion Links

Initial proposal discussion: http://apache-ignite-developers.2346864.n4.nabble.com/Best-Effort-Affinity-for-thin-clients-td31574.html

Tickets

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQueryproject = Ignite AND labels IN (iep-23) ORDER BY status
serverId5aa69414-a9e9-3523-82ec-879b028fb15b