...
- Transport Protocol: defaults to RAW
- HTTP Proxy server hostname: Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.
- HTTP Proxy server port: Specify the proxy server's port number, optional. If not specified, default port 80 will be used.
nifi.properties
This proposal uses following configurations in nifi.properties :
REST endpoints
Following REST endpoints will be added by this proposal:
...
The input-ports endpoint is used for sending data from source NiFi to a remote target NiFi. When a transaction is created, a HTTP POST request is created towards initiated with a transactionId -1url. Then while there is more data packet to be sent, the same HTTP POST request will be is used, to send data in streaming manner. The POST request also returns the final transaction location, which is transactionId-2. When the client finishes sending all data packets to output stream, it flushes the stream. Finally, the client send a DELETE request to transactionId url to complete the transaction towards transactionId-2.
PlantUML |
---|
actor A_component actor HttpClient actor HttpClientTransaction actor SiteToSiteRestApiUtil actor SiteToSiteResource ' comment: initialize A_component -> HttpClient: createTransaction HttpClient -> SiteToSiteRestApiUtil: initiateTransaction SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/input-ports/{portId}/transactions SiteToSiteRestApiUtil <-- SiteToSiteResource: transactionUrl-1, transactionProtocolVersion HttpClient <-- SiteToSiteRestApiUtil HttpClient -> HttpClientTransaction: new HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_STARTED HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForSend SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/input-ports/{portId}/transactions/{transactionId-1} HttpClient <-- HttpClientTransaction A_component <-- HttpClient: Transaction 'comment: receive alt while there is data packet to send A_component -> HttpClientTransaction: send HttpClientTransaction -> SiteToSiteResource: writes data to outputstream HttpClientTransaction -> HttpClientTransaction: state = DATA_EXCHANGED A_component <-- HttpClientTransaction end 'comment: confirm A_component -> HttpClientTransaction: confirm HttpClientTransaction -> SiteToSiteRestApiUtil: finishTransferFlowFiles SiteToSiteRestApiUtil <-- SiteToSiteResource: 201202 CreatedAccepted: transactionId-2,returns serverChecksum HttpClientTransaction -> HttpClientTransaction: validate server Checksum HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_CONFIRMED A_component <-- HttpClientTransaction 'comment: complete A_component -> HttpClientTransaction: complete HttpClientTransaction -> SiteToSiteRestApiUtil: commitTransferFlowFiles SiteToSiteRestApiUtil -> SiteToSiteResource: DELETE /site-to-site/input-ports/{portId}/transactions/{transactionId-2} SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK HttpClientTransaction <-- SiteToSiteRestApiUtil HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETED A_component <-- HttpClientTransaction |
...
The output-ports endpoint is used for receiving data from a remote target NiFi to source NiFi. When a transaction is created, a HTTP GET request is created towards initiated with a transactionId -1url. Then while there is more data packet to be received, the same HTTP GET request will be is used, to receive data in streaming manner. The GET request also returns the final transaction location, which is transactionId-2. When the client finishes consuming all data packets from input stream, and confirm() method is called, the client send a DELETE request with a Checksum calculated at client side to complete the transaction.
The complete() method doesn't do anything other than update state to TRANSACTION_COMPLETED.
PlantUML |
---|
actor A_component actor HttpClient actor HttpClientTransaction actor SiteToSiteRestApiUtil actor SiteToSiteResource ' comment: initialize A_component -> HttpClient: createTransaction HttpClient -> SiteToSiteRestApiUtil: initiateTransaction SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/output-ports/{portId}/transactions SiteToSiteRestApiUtil <-- SiteToSiteResource: transactionUrl-1, transactionProtocolVersion HttpClient <-- SiteToSiteRestApiUtil HttpClient -> HttpClientTransaction: new HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_STARTED HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForReceive SiteToSiteRestApiUtil -> SiteToSiteResource: GET /site-to-site/output-ports/{portId}/transactions/{transactionId-1} SiteToSiteRestApiUtil <-- SiteToSiteResource: 201 Created: transactionUrl-2202 Accepted HttpClientTransaction <-- SiteToSiteRestApiUtil HttpClient <-- HttpClientTransaction A_component <-- HttpClient: Transaction 'comment: receive alt while there is data packet to receive A_component -> HttpClientTransaction: receive HttpClientTransaction <-- SiteToSiteResource: read from inputstream HttpClientTransaction -> HttpClientTransaction: state = DATA_EXCHANGED A_component <-- HttpClientTransaction: data packet end 'comment: confirm A_component -> HttpClientTransaction: confirm HttpClientTransaction -> SiteToSiteRestApiUtil: commitReceivingFlowFiles(checksum) SiteToSiteRestApiUtil -> SiteToSiteResource: DELETE /site-to-site/output-ports/{portId}/transactions/{transactionId-2} SiteToSiteResource -> SiteToSiteResource: validate client Checksum SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK HttpClientTransaction <-- SiteToSiteRestApiUtil HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_CONFIRMED A_component <-- HttpClientTransaction 'comment: complete A_component -> HttpClientTransaction: complete HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETED A_component <-- HttpClientTransaction |
...
Scenario Type | {portId}/transactions | {portId}/transactions/{transactionId} | {portId}/transactions/{transactionId} |
---|---|---|---|
Transaction Initiation Failure |
| N/A | N/A |
Normal Case |
|
| -- confirm()
-- complete()
|
Normal Case - Destination becomes full | (after above interactions) | (after above interactions) | (branched from above interactions)
|
BAD_CHECKSUM | (after above interactions) | (after above interactions) | -- confirm()
|
Cancel transaction | (after above interactions) | (after above interactions) | -- cancel()
|
Defunct transaction | (after above interactions) | (after above interactions) |
|
Expired transaction | (after above interactions) | (after above interactions) | -- complete()
|
Transaction Initiation Failure | (after above interactions) | (after above interactions) | -- complete()
|
Defunct transaction | (after above interactions) |
| N/A |
Expired transaction | (after above interactions) |
| N/A |
Transaction Initiation Failure | (after above interactions) |
| N/A |
...
Scenario Type | {portId}/transactions | {portId}/transactions/{transactionId} | {portId}/transactions/{transactionId} |
---|---|---|---|
Transaction Initiation Failure |
| N/A | N/A |
Normal Case |
|
| -- confirm()
-- complete()
|
Normal Case - Destination becomes full | (after above interactions) | (after above interactions) | (branched from above interactions)
|
BAD_CHECKSUM | (after above interactions) | (after above interactions) | -- confirm()
|
Cancel transaction | (after above interactions) | (after above interactions) | -- cancel()
|
Defunct transaction | (after above interactions) | (after above interactions) |
|
Expired transaction | (after above interactions) | (after above interactions) | -- confirm()
|
Transaction Initiation Failure | (after above interactions) | (after above interactions) | -- confirm()
|
Defunct transaction | (after above interactions) |
| N/A |
Expired transaction | (after above interactions) |
| N/A |
Transaction Initiation Failure | (after above interactions) |
| N/A |
...