Goals
- Exchange flow files between two NiFi environments using Site-to-Site via HTTP(S)
Background and strategic fit
Some environments only allow network communication through HTTP(S) port, typically with multi-datacenter deployments. In order to exchange data between NiFi environments using Site-to-Site in such restricted deployments, we should add HTTP(S) as a transport protocol for Site-to-Site.
Assumptions
Requirements
# | Title | User Story | Importance | Notes |
---|---|---|---|---|
1 | Minimize required network pots to go through Firewall | The target NiFi server only allows access for HTTP/HTTPS. Raw Socket Site-to-Site requires additional port (typically 9990). | Must Have | To minimize required open ports, the new HTTP endpoints are added under /nifi-api/site-to-site, using the same port with the existing NiFi API. |
2 | Selectable Transport protocol | A DFM can select transport protocol to use from NiFi Web UI. Available protocols are 'RAW' and 'HTTP'. | Must Have | |
3 | Support HTTPS and auth | The network communications can be secured by HTTPS. When to do so, use source NiFi sends its certificate and target NiFi validates if it is registered within a trust store. | Must Have | |
4 | Support HTTP Proxy | To reach the target NiFi all communications have to go through a HTTP Proxy server. | Must Have | There is an existing JIRA issue to allow enabling security per Port NIFI-304 , but this proposal doesn't address it, provide security per server basis as Raw Socket does. |
5 | Same level of transaction characteristics as RAW Socket | For the flow-files transferred from NiFi-A to NiFi-B, the transaction should be committed on NiFi-A and NiFi-B, only if NiFi-A confirms that NiFi-B received the all sent data intact. Similar for flow-files retrieval operation. Details are described below. | Must Have | |
6 | Same level of port availability check as RAW Socket | The availability of data transport should be the same as RAW socket such as followings:
If the target port is not validated, then the peer (a host owning the port) should be penalized for a while to let other peers to be used. | Must Have | |
7 | Load balancing | Load balancing capability same as RAW Socket should be provided. The target port which has more data in its queue will receive less than others for sending flow-files, and it will be pulled more often than others for receiving. | Must Have | |
8 | Follow target NiFi environment topology change | If target NiFi cluster add/remove nodes and its topology changed, then the source NiFi environment should be able to detect the change automatically, meaning be able to use newly added nodes, or stop sending requests to removed nodes. | Must Have | |
9 | Protocol version management | In order to provide backward compatibility in the future, the client and server component should negotiate protocol version, and downgrade its behavior when counter part only supports old version. RAW Socket implementation already has protocol versions from 1 to 5 as of this writing. In order to let HTTP transport protocol version improve independently, yet reuse the existing same logic with Socket impl, this proposal uses two protocol versions, 'transport protocol version' and 'transaction protocol version'. | Must Have | Since this is the 1st timing to introduce HTTP Site-to-Site protocol: transport protocol ver: 1 transaction protocol ver: 5 |
10 | Batch up file transport | Must Have | ||
11 | Compression | Must Have | ||
12 | Cluster aware endpoints | Must Have |
User interaction and design
This proposal add new UI input in Remote Process Group configuration dialog as the following image:
- Transport Protocol: defaults to RAW
- HTTP Proxy server hostname: Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.
- HTTP Proxy server port: Specify the proxy server's port number, optional. If not specified, default port 80 will be used.
nifi.properties
This proposal uses following configurations in nifi.properties :
key | default value | description | |
---|---|---|---|
nifi.web.http.port | 8080 | ||
nifi.web.https.port | (blank) | ||
renamed |
nifi.remote.input.host | (blank) | Specify a hostname with that clients can reach to this host. This will be used by both RAW socket and HTTP. |
nifi.remote.input.socket.port | (blank) | Specify a port number to listen. RAW socket Site-to-Site is enabled when this property is set. | |
nifi.remote.input.secure | true | If it is true, then both RAW socket and HTTP should be secured, hence HTTPS protocol will be used. | |
new | nifi.remote.input.http.enabled | false | Specify true if HTTP Site-to-Site should be enabled on this host. |
new | nifi.remote.input.http.transaction.ttl | 30 sec | Specify how long a transaction can live on server, measured from the point of transaction creation. |
In order to allow a NiFi cluster to use HTTPS for Site-to-Site, but HTTP for communications within a cluster, siteToSiteHttpApiPort is added to NodeIdentifier. Because apiPort is determined by if cluster protocol manager to node is secure.
PeerSelector.java
PeerStatusProvider.java
EndpointConnectionPool
HttpClient fetchRemotePeerStatuses
HttpClient.java resolveNodeApiUrl from PeerDescription
NodeIdentifier.java is instantiated by
- WebClusterManager.resolveProposedNodeIdentifier
- StandardFlowService
- ?
DataFlowManagementServiceImpl has nodeIds in it
WebClustermanager has nodes, Node has NodeIdentifier in it.
That is added when a node is added to a cluster, a node sends a ConnectionRequest to join the cluster.
StandardFlowService.load -> connect
field | description | ||
---|---|---|---|
REST endpoints
Following REST endpoints will be added by this proposal:
- /site-to-site/
- GET: Returns required information of Site-to-Site for the source NiFi environment. Representing Controller of target NiFi environment.
- /site-to-site/peers/
- GET: Returns available peers of this NiFi environment.
- /site-to-site/input-ports/{portId}/transactions/
- POST: Initiate new transaction to send data from source to target NiFi. A new transaction id is published and returned.
- /site-to-site/input-ports/{portId}/transactions/{transactionId}
- DELETE: Commit the transaction which is held on server side.
- /site-to-site/input-ports/{portId}/transactions/{transactionId}/flow-files
- POST: Transfer data from source to target NiFi. The transaction will be held on server side instead of commit it immediately, in order to provide 2-phase style commit. Returns Checksum calculated on server side.
- /site-to-site/output-ports/{portId}/transactions/
- POST: Initiate new transaction to receive data from target to source NiFi. A new transaction id is published and returned.
- /site-to-site/output-ports/{portId}/transactions/{transactionId}
- DELETE: Commit the transaction which is held on server side. Client sends a Checksum calculated on client side.
- /site-to-site/output-ports/{portId}/transactions/{transactionId}/flow-files
- GET: Transfer data from target to source NiFi. The transaction will be held on server side instead of commit it immediately, in order to provide 2-phase style commit.
Refer sequence diagrams 'REST interactions and scenarios' below for the details of how these REST endpoints are used.
REST interaction sequence diagrams
Here are sequence diagrams describing complete HTTP request/response sequence of successful scenario for both input-ports and output-ports. Other semi-normal and error cases are described in Interaction Scenarios.
The A_component in the diagrams is a component which uses SiteToSiteClient class, such as:
- NiFiReceiver for Apache Spark
- NiFiBolt and NiFiSpoutReceiver for Apache Storm
- StandardRemoteGroupPort, this is the Remote Process Group processor in a NiFi data flow
input-ports/
The input-ports endpoint is used for sending data from source NiFi to a remote target NiFi. When a transaction is created, a HTTP POST request is initiated with a flow-file url within a transactionId. While there is more data packet to be sent, the same HTTP POST request is used, to send data in streaming manner. When the client finishes sending all data packets to output stream, it flushes the stream. Finally, the client send a DELETE request to transactionId url to complete the transaction.
output-ports/
The output-ports endpoint is used for receiving data from a remote target NiFi to source NiFi. When a transaction is created, a HTTP GET request is initiated with a flow-file url within a transactionId. While there is more data packet to be received, the same HTTP GET request is used, to receive data in streaming manner. When the client finishes consuming all data packets from input stream, and confirm() method is called, the client send a DELETE request with a Checksum calculated at client side to complete the transaction.
The complete() method doesn't do anything other than update state to TRANSACTION_COMPLETED.
REST interactions and scenarios
input-ports/
Scenario Type | {portId}/transactions | {portId}/transactions/{transactionId}/flow-files | {portId}/transactions/{transactionId} |
---|---|---|---|
Transaction Initiation Failure |
| N/A | N/A |
Normal Case |
|
| -- confirm()
-- complete()
|
Normal Case - Destination becomes full | (after above interactions) | (after above interactions) | (branched from above interactions)
|
BAD_CHECKSUM | (after above interactions) | (after above interactions) | -- confirm()
|
Cancel transaction | (after above interactions) | (after above interactions) | -- cancel()
|
Defunct transaction | (after above interactions) | (after above interactions) |
|
Expired transaction | (after above interactions) | (after above interactions) | -- complete()
|
Transaction Initiation Failure | (after above interactions) | (after above interactions) | -- complete()
|
Defunct transaction | (after above interactions) |
| N/A |
Expired transaction | (after above interactions) |
| N/A |
Transaction Initiation Failure | (after above interactions) |
| N/A |
output-ports/
Scenario Type | {portId}/transactions | {portId}/transactions/{transactionId}/flow-files | {portId}/transactions/{transactionId} |
---|---|---|---|
Transaction Initiation Failure |
| N/A | N/A |
Normal Case |
|
| -- confirm()
-- complete()
|
Normal Case - Destination becomes full | (after above interactions) | (after above interactions) | (branched from above interactions)
|
BAD_CHECKSUM | (after above interactions) | (after above interactions) | -- confirm()
|
Cancel transaction | (after above interactions) | (after above interactions) | -- cancel()
|
Defunct transaction | (after above interactions) | (after above interactions) |
|
Expired transaction | (after above interactions) | (after above interactions) | -- confirm()
|
Transaction Initiation Failure | (after above interactions) | (after above interactions) | -- confirm()
|
Defunct transaction | (after above interactions) |
| N/A |
Expired transaction | (after above interactions) |
| N/A |
Transaction Initiation Failure | (after above interactions) |
| N/A |
Questions
Below is a list of questions to be addressed as a result of this requirements document:
Question | Outcome |
---|---|