Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Page properties
Target release
Epic
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyNIFI-1857
Document status
Status
titleDRAFT
Document owner

Koji Kawamura

Designer
Developers
QA

Table of Contents

Goals

  • Exchange flow files between two NiFi environments using Site-to-Site via HTTP(S) 

...

  • /site-to-site/
    • GET: Returns required information of Site-to-Site for the source NiFi environment. Representing Controller of target NiFi environment.
  • /site-to-site/peers/
    • GET: Returns available peers of this NiFi environment.
  • /site-to-site/input-ports/{portId}/transactions/
    • POST: Initiate new transaction to send data from source to target NiFi. A new transaction id is published and returned.
  • /site-to-site/input-ports/{portId}/transactions/{transactionId}
    • POST: Transfer data from source to target NiFi. The transaction will be held on server side instead of commit it immediately, in order to provide 2-phase style commit. Returns Checksum calculated on server side.
    • DELETE: Commit the transaction which is held on server side.
  • /site-to-site/output-ports/{portId}/transactions/
    • POST: Initiate new transaction to receive data from target to source NiFi. A new transaction id is published and returned.
  • /site-to-site/output-ports/{portId}/transactions/{transactionId}
    • GET: Transfer data from target to source NiFi.  The transaction will be held on server side instead of commit it immediately, in order to provide 2-phase style commit.
    • DELETE: Commit the transaction which is held on server side. Client sends a Checksum calculated on client side.

 

Refer sequence diagrams 'REST interactions and scenarios' below for the details of how these REST endpoints are used.

REST interaction sequence diagrams

Here are sequence diagrams describing complete HTTP request/response sequence of successful scenario for both input-ports and output-ports. Other semi-normal and error cases are described in 

input-ports/

PlantUML
actor A_component
actor HttpClient
actor HttpClientTransaction
actor SiteToSiteRestApiUtil
actor SiteToSiteResource

' comment: initialize
A_component -> HttpClient: createTransaction
HttpClient
PlantUML
actor A_component
actor HttpClient
actor HttpClientTransaction
actor SiteToSiteRestApiUtil
actor SiteToSiteResource

' comment: initialize
A_component -> HttpClient: createTransaction
HttpClient -> SiteToSiteRestApiUtil: initiateTransaction
SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/output-ports/{portId}/transactions
SiteToSiteRestApiUtil <-- SiteToSiteResource: transactionUrl-1, transactionProtocolVersion
HttpClient <-- SiteToSiteRestApiUtil
HttpClient -> HttpClientTransaction: new
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_STARTED
HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForReceiveinitiateTransaction
SiteToSiteRestApiUtil -> SiteToSiteResource: GETPOST /site-to-site/outputinput-ports/{portId}/transactions/{transactionId-1}
SiteToSiteRestApiUtil <-- SiteToSiteResource: 201 Created: transactionUrl-2
HttpClientTransaction <-- SiteToSiteRestApiUtil1, transactionProtocolVersion
HttpClient <-- HttpClientTransaction
A_component <-- HttpClient: Transaction

'comment: receive
alt while there is data packet to receive
    A_component -> HttpClientTransaction: receive
	HttpClientTransaction <-- SiteToSiteResource: read from inputstream
	HttpClientTransaction -> HttpClientTransaction: state = DATA_EXCHANGED
	-- SiteToSiteRestApiUtil
HttpClient -> HttpClientTransaction: new
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_STARTED
HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForSend
SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/input-ports/{portId}/transactions/{transactionId-1}
HttpClient <-- HttpClientTransaction
A_component <-- HttpClientTransactionHttpClient: data packetTransaction
end
 
'comment: confirm
A_component -> HttpClientTransaction: confirm
HttpClientTransaction -> SiteToSiteRestApiUtil: commitReceivingFlowFiles(checksum)
SiteToSiteRestApiUtilreceive
alt while there is data packet to send
    A_component -> SiteToSiteResource: DELETE /site-to-site/output-ports/{portId}/transactions/{transactionId-2}
SiteToSiteRestApiUtil <--HttpClientTransaction: send
	HttpClientTransaction -> SiteToSiteResource: 200writes OK
HttpClientTransaction <-- SiteToSiteRestApiUtil
data to outputstream
	HttpClientTransaction -> HttpClientTransaction: state = TRANSACTIONDATA_CONFIRMEDEXCHANGED
	A_component <-- HttpClientTransaction
 end
 
'comment: completeconfirm
A_component -> HttpClientTransaction: confirm
HttpClientTransaction -> SiteToSiteRestApiUtil: complete finishTransferFlowFiles
SiteToSiteRestApiUtil <-- SiteToSiteResource: 201 Created: transactionId-2, serverChecksum
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETEDCONFIRMED
A_component <-- HttpClientTransaction
PlantUML
actor A_component
actor HttpClient
actor HttpClientTransaction
actor SiteToSiteRestApiUtil
actor SiteToSiteResource-- HttpClientTransaction

' comment: initializecomplete
A_component -> HttpClientHttpClientTransaction: createTransactioncomplete
HttpClientHttpClientTransaction -> SiteToSiteRestApiUtil: initiateTransactioncommitTransferFlowFiles
SiteToSiteRestApiUtil -> SiteToSiteResource: POSTDELETE /site-to-site/input-ports/{portId}/transactions
SiteToSiteRestApiUtil/transactions/{transactionId-2}
SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK
HttpClientTransaction <-- SiteToSiteRestApiUtil
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETED
A_component <-- HttpClientTransaction
 

output-ports/

PlantUML
actor A_component
actor HttpClient
actor HttpClientTransaction
actor SiteToSiteRestApiUtil
actor SiteToSiteResource

' comment: initialize
A_componentSiteToSiteResource: transactionUrl-1, transactionProtocolVersion
HttpClient <-- SiteToSiteRestApiUtil
HttpClient -> HttpClientTransaction: new
HttpClientTransaction -> HttpClientTransactionHttpClient: state = TRANSACTION_STARTED
HttpClientTransactioncreateTransaction
HttpClient -> SiteToSiteRestApiUtil: openConnectionForSendinitiateTransaction
SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/inputoutput-ports/{portId}/transactions/{transactionId-1}
HttpClient
SiteToSiteRestApiUtil <-- HttpClientTransaction
A_componentSiteToSiteResource: transactionUrl-1, transactionProtocolVersion
HttpClient <-- SiteToSiteRestApiUtil
HttpClient: Transaction

'comment -> HttpClientTransaction: receivenew
altHttpClientTransaction while-> thereHttpClientTransaction: isstate data packet to send
    A_component= TRANSACTION_STARTED
HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForReceive
SiteToSiteRestApiUtil -> HttpClientTransaction: send
	HttpClientTransaction -> SiteToSiteResource: writes data to outputstream
	HttpClientTransaction -> HttpClientTransaction: state = DATA_EXCHANGED
	SiteToSiteResource: GET /site-to-site/output-ports/{portId}/transactions/{transactionId-1}
SiteToSiteRestApiUtil <-- SiteToSiteResource: 201 Created: transactionUrl-2
HttpClientTransaction <-- SiteToSiteRestApiUtil
HttpClient <-- HttpClientTransaction
A_component <-- HttpClient: HttpClientTransactionTransaction
end
 
'comment: confirm
receive
alt while there is data packet to receive
    A_component -> HttpClientTransaction: confirm
HttpClientTransaction -> SiteToSiteRestApiUtil: finishTransferFlowFiles
SiteToSiteRestApiUtilreceive
	HttpClientTransaction <-- SiteToSiteResource: 201read Created: transactionId-2, serverChecksum
from inputstream
	HttpClientTransaction -> HttpClientTransaction: state = TRANSACTIONDATA_CONFIRMEDEXCHANGED
	A_component <-- HttpClientTransaction: data packet
end
 
'comment: completeconfirm
A_component -> HttpClientTransaction: completeconfirm
HttpClientTransaction -> SiteToSiteRestApiUtil: commitTransferFlowFilescommitReceivingFlowFiles(checksum)
SiteToSiteRestApiUtil -> SiteToSiteResource: DELETE /site-to-site/inputoutput-ports/{portId}/transactions/{transactionId-2}
SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK
HttpClientTransaction
SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK
HttpClientTransaction <-- SiteToSiteRestApiUtil
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_CONFIRMED
A_component <-- SiteToSiteRestApiUtilHttpClientTransaction
 
'comment: complete
A_component -> HttpClientTransaction: complete
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETED
A_component <-- HttpClientTransaction
 

Refer 'REST interactions and scenarios' below for the details of how these REST endpoints are used. 

REST interactions and scenarios

input-ports/

Scenario Type

{portId}/transactions

{portId}/transactions/{transactionId}{portId}/transactions/{transactionId}
Transaction Initiation Failure
  • Client sends a POST with handshake parameters
  • Server responds :
    • 400 Bad Request: if the protocol version is not specified, or not supported
    • 401 Unauthorized : If request is not authorized to the port
    • 403 Forbidden: if it is a NiFi Cluster Manager
    • 404 Not Found: if port is not found with portId
    • 503 Service unavailable : If the port is not running, not valid state, or port's destination is full
N/AN/A
Normal Case
  • Client sends POST with handshake parameters
  • Server creates a receiving transaction
  • Server responds:
    • 201 Created: The created transaction's URL is returned with 'Location' header
  • Client sends a POST to a receiving transaction
  • Client streams data packets
  • Client status -> DATA_EXCHANGED
  • Server removes receiving transaction
  • Server consumes input stream to receive incoming data packets
  • Server creates a holding transaction
  • Server responds 201 Created: The created transaction's URL is returned with 'Location' header, and Checksum is returned with response body
  • Client checks client Checksum and server Checksum
  • If Checksums are identical
  • Client state -> TRANSACTION_CONFIRMED
  • Client sends a DELETE to a holding transaction with CONFIRM_TRANSACTION
  • Server commits its session
  • Server responds 200 OK with TRANSACTION_FINISHED
  • Client state -> TRANSACTION_COMPLETED
Normal Case - Destination becomes full(after above interactions)

(after above interactions)

(branched from above interactions)

  • Server responds 200 OK with TRANSACTION_FINISHED _BUT_DESTINATION_FULL if there's no available relationships
  • Client state -> TRANSACTION_COMPLETED
  • Client panalize peer. TODO: only implemented in Socket, EndpointConnectionPool.java
BAD_CHECKSUM(after above interactions)

(after above interactions)

  • Client checks client Checksum and server Checksum
  • If Checksums are not identical
  • Client sends a DELETE to a holding transaction with BAD_CHECKSUM
  • Client rollbacks its session
  • Server rollbacks its session
  • Server responds 200 OK with CANCEL_TRANSACTION
Cancel transaction(after above interactions)

(after above interactions)

  • Client cancel the transaction for some reason (There's no component which cancels a transaction as of this writing, though)
  • TODO: not implemented yet
  • Client sends a DELETE to a holding transaction with CANCEL_TRANSACTION
  • Client status -> TRANSACTION_CANCELED
Defunct transaction(after above interactions)(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
  • Transaction on client side might be rollbacked since it's not working properly
  • Transaction on server should be rollbacked

Expired transaction

(after above interactions)(after above interactions)
  • Client sends a DELETE to a holding transaction
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
  • Transaction on client should be rollbacked
  • Transaction on server has already rollbacked
Transaction Initiation Failure(after above interactions)(after above interactions)
  • Client sends a DELETE to a holding transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Transaction on server should be rollbacked
Defunct transaction(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
N/A
Expired transaction(after above interactions)
  • Client sends a POST to a receiving transaction
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
N/A
Transaction Initiation Failure(after above interactions)
  • Client sends a POST to a receiving transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Server session hasn't started yet
N/A

 

output-ports/

Scenario Type

{portId}/transactions

{portId}/transactions/{transactionId}{portId}/transactions/{transactionId}
Transaction Initiation Failure
  • same implemetation with input-ports
N/AN/A
Normal Case
  • Client sends POST with handshake parameters
  • Server creates a receiving transaction
  • Server responds:
    • 201 Created: The created transaction's URL is returned with 'Location' header
  • Client sends a GET to a transferring transaction
  • Client status -> DATA_EXCHANGED
  • Server removes transferring transaction
  • Server creates a holding transaction
  • Server responds 201 Created: The created transaction's URL is returned with 'Location' header
  • Client consumes input stream to receive incoming data packets
  • Server writes data packets into output stream

-- confirm()

  • Client calculate Checksum of received data
  • Client sends a DELETE to a holding transaction with Checksum
  • Server checks client Checksum and server Checksum
  • If checksums are identical
  • Server commits its session
  • Server responds 200 OK with TRANSACTION_FINISHED
  • Client state -> TRANSACTION_CONFIRMED

-- complete()

  • Client state -> TRANSACTION_COMPLETED
  • TODO Destination Full case is not implemented yet. Another roundtrip is required??? but there's no client side impl sending TRANSACTION_FINISHED _BUT_DESTINATION_FULL. Besides that, server side desn't have to penalize a peer, does it?
Normal Case - Destination becomes full(after above interactions)

(after above interactions)

(branched from above interactions)

  • Server responds 200 OK with TRANSACTION_FINISHED _BUT_DESTINATION_FULL if there's no available relationships
  • Client state -> TRANSACTION_COMPLETED
  • Client panalize peer. TODO: only implemented in Socket, EndpointConnectionPool.java
BAD_CHECKSUM(after above interactions)

(after above interactions)

  • Client checks client Checksum and server Checksum
  • If Checksums are not identical
  • Client sends a DELETE to a holding transaction with BAD_CHECKSUM
  • Client rollbacks its session
  • Server rollbacks its session
  • Server responds 200 OK with CANCEL_TRANSACTION
Cancel transaction(after above interactions)

(after above interactions)

  • Client cancel the transaction for some reason (There's no component which cancels a transaction as of this writing, though)
  • TODO: not implemented yet
  • Client sends a DELETE to a holding transaction with CANCEL_TRANSACTION
  • Client status -> TRANSACTION_CANCELED
Defunct transaction(after above interactions)(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
  • Transaction on client side might be rollbacked since it's not working properly
  • Transaction on server should be rollbacked

Expired transaction

(after above interactions)(after above interactions)
  • Client sends a DELETE to a holding transaction
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
  • Transaction on client should be rollbacked
  • Transaction on server has already rollbacked
Transaction Initiation Failure(after above interactions)(after above interactions)
  • Client sends a DELETE to a holding transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Transaction on server should be rollbacked
Defunct transaction(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
N/A
Expired transaction(after above interactions)
  • Client sends a GET to a transferring transaction
  • Server responds 404 Not Found, if the transfering transaction is already removed because it's expired
N/A
Transaction Initiation Failure(after above interactions)
  • Client sends a POST to a transferring transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Server session hasn't started yet
N/A

...