You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Current »

Target release0.5.0
EpicNIFI-259
Document statusDRAFT
Document owner

Mark Payne

Designer
Developers
QA

 

Goals

  • Provide Processors and other components a simple way to store local or cluster-wide state
  • Provide components a mechanism for retrieving the state that has been stored by other nodes in the cluster
  • Provide a mechanism for framework to communicate information between nodes
  • Allow Processors to be easily 'partitionable', so that work is easily split among nodes in a cluster

Background and strategic fit

NiFi provides two basic operating modes: standalone and clustered. When operating in a cluster, Processors often need some mechanism of coordinating state between nodes. One example of a Processor that implements such a capability is ListHDFS. The implementation required to accomplish this is very laborious and error prone, however. Many other Processors could benefit greatly from this approach but this work has not been done because it is very difficult and error prone. Implementing a feature in the framework to share state across the cluster would be tremendously helpful.

Additionally, there are several processors (GetHTTP, for example) that persist state locally. This is done in an ad-hoc manner, creating local files to store the information. This will become far easier and more consistent if a mechanism is provided for storing local state as well.

If we make this capability to store clustered state available to the framework as well, it can provide a mechanism to more easily store information about which nodes are available in the cluster, which can ease the development of the "partionable" processors. Many protocols, such as SFTP do not provide a way to easily spread the work across a cluster of NiFi nodes. If we devise a mechanism by which the work can be partitioned across the nodes, and then exposed this information to components running on the nodes, we could much more easily spread this processing across the cluster. This may take the form of an AbstractPartitionedProcessor, or some utility class, or perhaps just a well-written example for how to interact with remote resources in a partitioned environment.

The implementation details of how and where this state is stored should not be exposed to the components. The most likely implementation, though, would be a ZooKeeper backed implementation. We do not want to require an external ZooKeeper be available in order to run NiFi in a clustered mode, though, so it would make sense to embed a ZooKeeper in NiFi but also allow an external ZooKeeper instance to be used if configured to do so by an administrator.

Assumptions

Requirements

#TitleUser StoryImportanceNotes
1Provide Local State StorageNeed ability for components to store and retrieve local stateMust Have
2Provide Distributed State StorageExtensions need the ability to share state across all nodes in a clusterMust Have

State will be available only to the component that set the state, but will be available to all nodes. If a component wants to share state across multiple nodes and multiple components, that state should be stored by a Controller Service using the distributed state mechanism and that Controller Service can then be shared across components

3Provide Partitioning of WorkProcessors need to be instructed on how to partition their work across nodes in a cluster.Must Have 

User interaction and design

I have made a significant amount of progress toward this Feature Proposal and expect it to be rolled into the 0.5.0 version, with the exception of #3 above (Provide Partitioning of Work). That can come in a later release, as it is fairly independent from the work of storing state locally and clustered.

From an API perspective, I have updated the ProcessContext to contain a new method:

/**
* @return the StateManager that can be used to store and retrieve state for this component
*/
StateManager getStateManager();
 

The StateManager, then, is defined as such:

StateManager.java

 

This provides a simple, consistent API for component developers to use to store and retrieve state, simply indicating whether they are interested in local or cluster-wide state. If a developer uses the CLUSTER scope, but NiFi is not connected to a cluster, the framework is responsible for simply delegating to the Local provider instead. This allows a Processor developer to not have to worry about whether they are running in a clustered environment or not.

From the framework perspective, the work is performed by allowing a simple StateManager class to delegate to a StateProvider. State Provider has essentially all of the same methods as State Manager but is intended to perform the storage/retrieval of state by using some underlying mechanism. The nifi.properties file then is configured to provide a Local State Provider and a Clustered State Provider. There is a single implementation of each at this time. The local state provider is built atop the WriteAhead Log that is used by the FlowFile repository, so that it is very efficient to save state many times. Each component will store data in a separate file/directory. This allows the state for a single component to be manually deleted if needed by an administrator. This may be done, for example, if a bug is found in a processor such that state cannot be read, or if framework state needs to be cleared (for instance, if we are copying the state from 1 node to another but want to remove information held about the node's cluster information).

The Clustered State Provider is implemented by communicating with a ZooKeeper instance. The ZooKeeper state provider is configured with a connect string and the root node in ZooKeeper where data should be stored. This allows multiple NiFi instances to store state in different ZooKeeper nodes.

We want to avoid requiring that an administrator already have a ZooKeeper instance installed and maintained in order to use NiFi, however. As such, a NiFi node can be configured to start an embedded ZooKeeper server. This way, an administrator can start an embedded server on as many nodes as he/she wishes.

 

Questions

Below is a list of questions to be addressed as a result of this requirements document:

QuestionOutcome
How do we handle local state that already has been stored, such as for GetHTTP?The @OnScheduled method of this Processor has been updated to read the existing state file (if it exists), store the information into the State Manager, and delete the existing file.
How do we handle cluster-wide state that already was stored, such as with ListHDFS?The @OnScheduled method of these Processors have been updated to read the existing state file (if it exists) and/or retrieve state from the configured DistributedMapCacheClientService, store the information into the State Manager, and delete the existing file / clear the info in the DistributedMapCacheClientService.

 

 
  
  

JIRA Issues

 

Not Doing

  • No labels