ID | IEP-23 |
Author | |
Sponsor | |
Created | 18 Jun 2018 |
Status | COMPLETED |
Currently, there is an issue with cache operations latency in thin clients, which also can result in throughput issues when using synchronous operations. It's caused by the additional network hop from client to server, as thin client currently is not able to determine physical location of the data, so it sends all cache requests to random server, which re-routes data to the right server.
The proposed solution is to implement "Affinity Awareness" for our thin clients, so they will send cache requests to the server, contain the data right away. With this idea in mind, we have potential to improve mean latency when using thin clients dramatically
Here you can find description on how the solution can be implemented.
Connection to all nodes helps to identify available nodes, but can lead to significant delay, when thin client is used on a large cluster with a long IP list provided by user. To lower this delay, asynchronous establishment of connections can be used.
The format of the handshake messages can be found here. Only successful response message is changed.
You can find a changes below. Added fields are in bold green.
Field type | Field description |
---|---|
int | Success message length, 1. |
byte | Success flag, 1. |
UUID | UUID of the server node. |
Map<UUID, TcpConnection> connect(ClientConfig cfg) { Map<UUID, TcpConnection> nodeMap = new Map(); // Synchronous case here, as it is easier to read, but the same operation // can be performed asynchronously. for (addr: cfg.GetAddresses()) { TcpConnection conn = establishConnection(addr); if (conn.isGood()) { HandshakeResult handshakeRes = handshake(conn); if (handshakeRes.isOk()) { UUID nodeUuid = handshakeRes.nodeUuid(); if (nodeMap.contains(nodeUuid)) { // This can happen if the same node has several IPs. // It have sense to keep more fresh connection alive. nodeMap.get(nodeUuid).disconnect(); } nodeMap.put(nodeUuid, conn); } else { conn.disconnect(); } } } if (nodeMap.isEmpty()) { // This is the only case which leads to the failure of the whole operation. // If at least one connection has been established, we can work with the cluster. reportFailure("Can not establish connection to a cluster); } return nodeMap; }
To be able to route data to the primary node for the key, client should know partition mapping for a cache. There are several possible cases when client may want to request an affinity mapping for a cache or several caches, so it makes sense to add possibility to request affinity mapping for a several caches in one request. Also, partition mappings for several caches are often the same, so as an optimization, it makes sense to include in response with partition mapping a list of caches for which it applies. Thus the partitions request can be described by the following steps:
See proposed Cache Partitions Request and Response message format below.
Field type | Description |
---|---|
Header | Request header. Format details can be found here. |
int | Number of caches N to get partition mappings for. |
int | Cache ID #1 |
int | Cache ID #2 |
... | ... |
int | Cache ID #N |
Field type | Description |
---|---|
Header | Response header. Format details can be found here. |
long | Topology Affinity Version. |
int | Minor Topology Affinity Version. |
int | Number of cache mappings J, that describe all the caches listed in request. |
Partition Mapping | Partition mapping #1. [[cacheId] => [nodeUuid => partition]]. See format below. |
Partition Mapping | Partition mapping #2 |
... | ... |
Partition Mapping | Partition mapping #J |
Field type | Description |
---|---|
bool | Applicable. Flag that shows, whether standard affinity is used for caches. |
int | Number K of caches for which this mapping is applicable |
int | Cache ID #1 |
Cache key configuration | Key configuration for cache #1. Present only if Applicable is true. |
int | Cache ID #2 |
Cache key configuration | Key configuration for cache #2. Present only if Applicable is true. |
... | ... |
int | Cache ID #K |
Cache key configuration | Key configuration for cache #K. Present only if Applicable is true. |
int | Number L of nodes. Present only if Applicable is true. |
Node Partitions | Partitions of the node #1. Present only if Applicable is true. |
Node Partitions | Partitions of the node #2. Present only if Applicable is true. |
... | ... |
Node Partitions | Partitions of the node #L. Present only if Applicable is true. |
Field type | Description |
---|---|
int | Number R of key configurations |
int | Key type ID #1 |
int | Affinity Key Field ID #1 |
int | Key type ID #2 |
int | Affinity Key Field ID #2 |
... | ... |
int | Key type ID #R |
int | Affinity Key Field ID #R |
Field type | Description |
---|---|
UUID | UUID of the node |
int | Number of partitions M associated with node |
int | Partition #1 for node. |
int | Partition #2 for node. |
... | ... |
int | Partition #M for node. |
When user makes key-based cache operation, thin client makes the best effort to send the request to the node, which stores the data.
Response sendRequest(CacheKey key, Message msg) { UUID nodeUuid = null; Connection nodeConnection = null; if (!distributionMap.contains(cacheId)) updateCachePartitions(cacheId); // See "Cache instance acquiring" PartitionMap partitionMap = distributionMap.get(cacheId); if (!partitionMap.empty()) { Object affinityKey = key; Map<int, int> keyAffinityMap = cacheKeyMap.get(cacheId); int affinityKeyId = keyAffinityMap.get(key.typeId()); if (affinityKeyId != null) affinityKey = key.getFieldById(affinityKeyId) int partition = RendezvousAffinityFunction(affinityKey); nodeUuid = partitionMap.get(partition); nodeConnection = nodeMap.get(nodeUuid); } if (nodeConnection == null) nodeUuid, nodeConnection = nodeMap.getRandom(); while (true) { try { Response rsp = nodeConnection.send(msg); return rsp; } catch(err) { logWarning(err); nodeConnection.disconnect(); nodeMap.remove(nodeUuid); if (nodeMap.isEmpty()) reportErrorToUser("Cluster is unavailable"); nodeUuid, nodeConnection = nodeMap.getRandom(); } } }
It is important for client to keep partition mapping updated. To ensure this, the following changes are proposed:
This mechanism allows client passively detect change of the Affinity Topology. It also adds overhead of only 1 bit of additional information in most cases, indicating, that Affinity Topology has not changed.
Please note, that every server node tracks its own Last Reported Affinity Topology Version. It means, that every change in Affinity topology version is likely to be reported multiple times to the client - once by every server node. However, since the standard response header contains Affinity Topology Version, it is trivial for a client to detect, whether client needs to update mapping or whether already known change is reported.
Once client detects changes in the Affinity Topology, it does the following:
The format of the standard response messages can be found here.
You can find a changes below. Added and changed fields are in bold green.
Field type | Description |
---|---|
int | Length of the response message. |
long | Request ID. |
short | Flags. ("Error" and "Afiinity Topology Changed" flags are proposed) |
long | Topology Affinity Version. (present only when "Afiinity Topology Changed" flag is set) |
int | Minor Topology Affinity Version. (present only when "Afiinity Topology Changed" flag is set) |
int | Status Code (present only when "Error" flag is set) |
String | Error message (present only when "Error" flag is set) |
As shown above, it is proposed to add new "Flags" field, to reduce size of the success response message (the most common case).
Benchmarks that were performed on thin clients show good performance improvements for 3-nodes use case of C++ thin client with prototype implementation.
Initial proposal discussion: http://apache-ignite-developers.2346864.n4.nabble.com/Best-Effort-Affinity-for-thin-clients-td31574.html