Goals

  • Allow a NiFi cluster to continue processing and distributing data even when the node where that data lives is lost

Assumptions

Requirements

#TitleUser StoryImportanceNotes
1Make data available to 1 or more 'replica nodes' (user configurable number of replicas) MUSTThis means that the content of the FlowFiles, as well as the Attributes and the state within the flow must be replicated.
Essentially, this is replication of both the Content Repository and the FlowFile Repository.
2Replicate inline / synchronously MUST 

We must replicate the FlowFile Content, Attributes, and state within the flow inline / synchronously.
We cannot replicate in a background thread, as doing so would mean that if a node is lost, some of its data may not have been
replicated to other nodes in the cluster yet. 

3Optionally compress data SHOULDAdministrators should be able to control whether or not data replication uses compression. We could potentially be smart about
this, by inspecting MIME type to determine that we know data already is compressed or by detecting the compression rate that
we are getting and not compressing the data if we get less than 5% compression, etc.
4Elect a new 'owner' for each piece of data if the owner drops from the cluster MUSTWe must ensure that whenever a node is disconnected from the cluster that each piece of data that it was processing gets
handed off to exactly one other node. We cannot have more than one node pick up processing of the data, as it will result in
duplicates, while failure to have any node process the data will result in data loss or delay. 
5Prevent disconnected node from processing its data MUSTIf a node is disconnected from a cluster, it must immediately stop processing the data. Otherwise, another node may have already
been assigned to take over ownership of the data, resulting in data replication. Upon restart, the node that created the data cannot
pick up where it left off without re-establishing itself as the 'owner'. 

User interaction and design

  • The basic unit of content storage in NiFi is the Content Claim. Many Content Claims make up a Resource Claim. The Content Repository, then, operates on Resource Claims. It makes sense, then, to ensure that all FlowFiles whose content belongs to the same Resource Claim are all replicated to the same nodes.

  • When a Process Session requests an OutputStream from the Content Repository (via the write() method), the provided OutputStream should be responsible for replicating the data to all replica nodes.

 

Data Buckets

One or more Resource Claims (and the FlowFiles associated with those Resource Claims) will be grouped together into a "Data Bucket".
The Data Bucket Resource will be created and replicated to all 'relevant' nodes in the cluster via the REST API.  A node is considered 'relevant'
if it owns the bucket or is a replica of the bucket. The Data Bucket Resource will consist simply of a Bucket Identifier, the Owner Node, and one
or more Replica Nodes. It will not contain any of the data itself. Once created, a Data Bucket can be referenced by its identifier.
The FlowFile class will be augmented to include a Bucket Identifier to denote this.

The URI to a Data Bucket will be along the lines of /nifi-api/nodes/<node id>/buckets/<bucket id>. This allows a GET request to merge the responses
so that we can determine all of the buckets that belong to a node.

When a POST request is received for the Buckets Endpoint, a Data Bucket Manager is updated to keep track of which buckets exist.

When Node A is disconnected from the cluster, the active Cluster Coordinator is responsible for determining which buckets are associated with Node A

via a GET request to /nifi-api/nodes/Node-A/buckets. Once this has completed, it is responsible for determining which of the replica nodes will be responsible
for each Bucket. Once a determination has been made for the new owner, a PUT request will update the resource (perhaps the API is not correct here because
the resource is now changing so that its owner is different so the URI now needs to be different. Perhaps just a query param.)

If a node receives a PUT request for a Bucket such that it is the new owner, it should immediately begin processing the data. If the PUT request fails, the owner
should be changed and another PUT request should be made. 

Once all data that is held by a Data Bucket has been removed from the flow, the Data Bucket can be deleted by issuing the corresponding DELETE
web request.

In order to create a FlowFile, we must first have a bucket to assign it to. This means that if there are less than N nodes in the cluster (where N is the number of
replicas), we cannot process data. Or we cannot replicate. How to handle this is unclear. Perhaps user-configurable? If ZooKeeper not available, we are also
in the same situation. Either cannot process or cannot replicate.

Once a bucket is 'filled' (is too old or contains too many flowfiles or too much data or whatever the criteria may be), a new bucket is created and new FlowFiles
are assigned to that bucket. This allows the data to be split up into small chunks so that the load of the disconnected node can be effectively spread across
the cluster. 

 

 

Questions

Below is a list of questions to be addressed as a result of this requirements document:

QuestionOutcome
 
  

JIRA Issues

 

 

Not Doing

  • Replication of Data Provenance. This may be tackled later in a different Feature Proposal but is not a goal of this Feature Proposal.

 

  • No labels