Goals
- Allow user to indicate that data in a Connection should be spread across the cluster instead of just queuing up on the node
Background and strategic fit
It is a very common use case for a user to want to distribute the data in a flow across all nodes in the cluster. Typically, this is done by creating a Remote Process Group that points back to the cluster. This is okay when there is only one or a few instances of this. While this works, though, it was not the original intended use case for Site-to-Site / Remote Process Groups and is quite cumbersome for users. The rise of the List* Processors has made this very common, as users will often want to perform a listing of data on a remote system on the Primary Node, then distribute that listing across the cluster to pull the data and process it in parallel.
This is typically accomplished by creating a ListSFTP processor, for example, and then sending its results to a Remote Process Group. First, though, the user must create a Root-Group Port to send the data to. The user must then grant appropriate permissions to that Root-Group Port. Next, the ListSFTP processor can be connected to a Remote Process Group. The Port within the Remote Process Group must then be configured to send in batches of 1 FlowFile. Then, the Root-Group Port must be connected to the FetchSFTP processor. This presents the following problems:
- It is not clear from UI what the intent is, due to the separation of data from ListSFTP to an RPG and a separate flow that connects an Input Port to a FetchSFTP.
- It is cumbersome to create all of these components and to track the data as it moves through the system.
- This results in a lot of complexity in the UI, as there are now at least 4 components to perform a Listing and Fetching data. If the ListSFTP and FetchSFTP processors are buried many Process Groups down, though, this can be many components and much more indirect. When this becomes a typical pattern, there can be hundreds of such cases.
- In a multi-tenant environment, users often do not have permissions to create Ports at the Root Group. An administrator must now be involved in order to create that Port and assign appropriate permissions, and connect to the appropriate Local Port for the user to continue their flow.
- Remote Process Groups are able to spread the load around the cluster but are not able to provide 'partitioning' of the data or 'node affinity'.
This Feature Proposal aims to simplify the configuration of the flow by allowing the user to configure a Connection and choose a Load Balancing Strategy: "Do Not Load Balance" (the default), "Round Robin" (which would automatically fail over if unable to communicate with a node), "Single Node" (all data goes to a single node in the cluster) or "Partition by Attribute" (more details below). By enabling the user to configure the flow this way, a user will be able to simply create a flow as ListSFTP → FetchSFTP and have the connection configured to use the "Round Robin" Load Balancing Strategy. This would result in automatically spreading the listing throughout the cluster and would require no Remote Process Groups or Ports be created. The flow is now much more self-contained and clear and concise.
Assumptions
Requirements
# | Title | Importance | Notes |
---|---|---|---|
1 | Provide protocol for distributing data between nodes in the cluster | MUST | |
2 | Support secure data transfer between nodes | MUST | Inter-node data transfer should be performed via SSL/TLS if a Keystore and Truststore are configured. |
3 | Expose configuration of load balancing properties in nifi.properties | MUST | |
4 | Expose configuration in user interface for selecting a Load Balancing Strategy | MUST | |
5 | Update User Guide images to depict new Connection Configuration dialog | MUST | |
6 | Update Admin Guide to explain new nifi.properties properties | MUST | |
7 | Update User Guide to explain how the feature works | MUST | |
8 | Persist state about the nodes in the cluster and restore this state upon restart | SHOULD | When the cluster topology changes, data that is partitioned via attribute will have to be rebalanced. Upon a restart of the entire cluster, this can result in a lot of data being pushed around. To avoid this, we should persist the nodes in a cluster and restore this information upon restart. If the cluster is shutdown and one node is not intended to be brought back up, the user will be responsible from removing the node from the cluster via the "Cluster" screen in the UI. |
9 | Provide a visual indicator in the UI as to how many FlowFiles are awaiting transfer to another node | SHOULD | If the UI does not easily provide an indicator of how many FlowFiles are waiting to be transferred, users may well be confused about why the connection shows X number of FlowFiles but the destination is not processing them. |
10 | Allow user to configure whether or not compression is used | SHOULD | For some data, it will not make sense to compress the data during transfer, such as data that is known to be GZIP'd. For other data, compression can be very helpful. Additionally, even when the FlowFile content cannot be usefully compressed, the attributes likely can and so an option should be provided to compress Nothing, FlowFile Attributes Only, or FlowFiles Attributes and Content. This may well be implemented at a later time. |
User interaction and design
Users will be presented with an option when configuring a connection to specify a "Load Balancing Strategy." The user will choose from one of three options:
- Do Not Load Balance: This will be the default value and will cause the connection to behave as all Connections do currently.
- Round Robin: FlowFiles will be distributed across the cluster in a round-robin fashion. If a node is disconnected from the cluster or if unable to communicate with a node, the data that is queued for that node will be automatically redistributed to another node(s).
- Single Node: All FlowFiles will be distributed to a single node in the cluster. If the node is disconnected from the cluster or if unable to communicate with the node, the data that is queued for that node will remain queued until the node is available again. Which node the data goes to will not be configurable. All connections that are configured with this strategy will send data to the same node. There was some consideration to allow the user to specify which node data should go to, in order to allow the data to be spread across the cluster at the user's discretion. However, this has some significant downsides: as clusters become more elastic, the nodes in the cluster may be added or removed more arbitrarily, making the selected value invalid and requiring manual intervention. Additionally, it would mean that storing this value in a Flow Registry or a template would make the configuration invalid in any other environment. A better approach that was also considered was to send all data to the Primary Node. This has the downside, however, if requiring that all data be transferred any time that the Primary Node changes. To avoid this, we will simply choose a node at the framework's discretion and send all data to that node.
- Partition by Attribute: The user will provide the name of a FlowFile Attribute to partition the data by. All FlowFiles that have the same value for the specified attribute will be distributed to the same node in the cluster. If a FlowFile does not have a value for that attribute, the absence of the attribute (i.e., the value of `null`) will itself be considered a value. So all FlowFiles that do not have that attribute will be sent to the same node. If the destination node is disconnected from the cluster or is otherwise unable to communicate, the data should not fail over to another node. The data should queue, waiting for the node to be made available again. If the topology of the cluster is changed, this will result in a rebalancing of the data. Consistent Hashing should be used in order to avoid having to redistribute all of the data when a node joins or leaves the cluster.
When data is transferred from one node to another, the Data Provenance trail should work the same as it does with Site-to-Site. I.e., if data is transferred from Node A to Node B, then Node A should provide a SEND and a DROP Provenance Event. Node B should provide a RECEIVE Provenance Event, and this RECEIVE event should populate the Source System Identifier so that users are able to correlate the two FlowFiles. The receiving system should not assign the same UUID as the sending system.
This will necessitate adding the following properties to nifi.properties:
Proposed Property Name | Description | Default Value |
---|---|---|
nifi.cluster.load.balance.port | The port to listen on for incoming connections for load balancing data | 6342 |
nifi.cluster.load.balance.host | The hostname to listen on for incoming connections for load balancing data | The same as the `nifi.cluster.node.address` property, if specified, else localhost |
nifi.cluster.load.balance.connections.per.node | The number of TCP connections to make to each node in the cluster for load balancing | 4 |
nifi.cluster.load.balance.max.threads | The max size of the thread pool to use for load balancing data across the cluster | 8 |
It will be important to avoid heap exhaustion if we are now maintaining multiple queues of FlowFiles. While the default is to swap out any FlowFiles over 20,000 in increments of 10,000 we cannot do this for each internal queue. If we did, then a cluster of 10 nodes would end up queuing 10 * 20,000 = 200,000 FlowFiles in each queue, which would quickly exhaust heap. As a result, we will swap out the data more aggressively in the queues that are responsible for distributing to other nodes - likely after 1,000 FlowFiles have been reached, in increments of 1,000. The "local partition," however, can continue to swap out at 20,000 / 10,000.
Questions
Below is a list of questions to be addressed as a result of this requirements document:
Question | Outcome |
---|---|
What permissions should be required/checked when receiving data from a load balancing protocol? | Rather than relying on explicit permissions, we will ensure that data that is sent in a secure clustered is sent from a node whose certificate is trusted by the configured TrustStore and moreover that the certificate's Distinguished Name or one of its Subject Alternative Names maps to the Node identity of one of the nodes in the cluster. In this way, we ensure that the data comes from a node that is in fact part of the cluster, and this scales nicely even as we have more elastic clustering. |
How should Disconnected nodes be handled? | A node that has been disconnected from the cluster should continue to Load Balance the data in its queue to all nodes in the cluster. However, the nodes that are connected should not send data to the disconnected node. This approach mimics how the Site-to-Site works today and lays the groundwork for later work to decommission a node in a cluster. If a node is disconnected, then it will not receive any new notifications about cluster topology updates. Therefore, it is possible that it will start sending the data to the wrong node if using the "Partition by Attribute" strategy. This needs to be acceptable, and the node that receives such data needs to be responsible for determining which node in the cluster should receive the data and distributing the data appropriately. |