Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Transport Protocol: defaults to RAW
  • HTTP Proxy server hostname: Specify the proxy server's hostname to use. If not specified, HTTP traffics are sent directly to the target NiFi instance.
  • HTTP Proxy server port: Specify the proxy server's port number, optional. If not specified, default port 80 will be used.

nifi.properties

This proposal uses following configurations in nifi.properties : 

REST endpoints

Following REST endpoints will be added by this proposal:

...

The input-ports endpoint is used for sending data from source NiFi to a remote target NiFi. When a transaction is created, a HTTP POST request is created towards initiated with a transactionId -1url. Then while there is more data packet to be sent, the same HTTP POST request will be is used, to send data in streaming manner.  The POST request also returns the final transaction location, which is transactionId-2. When the client finishes sending all data packets to output stream, it flushes the stream. Finally, the client send a DELETE request to transactionId url to complete the transaction towards transactionId-2.

 

PlantUML
actor A_component
actor HttpClient
actor HttpClientTransaction
actor SiteToSiteRestApiUtil
actor SiteToSiteResource

' comment: initialize
A_component -> HttpClient: createTransaction
HttpClient -> SiteToSiteRestApiUtil: initiateTransaction
SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/input-ports/{portId}/transactions
SiteToSiteRestApiUtil <-- SiteToSiteResource: transactionUrl-1, transactionProtocolVersion
HttpClient <-- SiteToSiteRestApiUtil
HttpClient -> HttpClientTransaction: new
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_STARTED
HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForSend
SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/input-ports/{portId}/transactions/{transactionId-1}
HttpClient <-- HttpClientTransaction
A_component <-- HttpClient: Transaction

'comment: receive
alt while there is data packet to send
    A_component -> HttpClientTransaction: send
	HttpClientTransaction -> SiteToSiteResource: writes data to outputstream
	HttpClientTransaction -> HttpClientTransaction: state = DATA_EXCHANGED
	A_component <-- HttpClientTransaction
end
 
'comment: confirm
A_component -> HttpClientTransaction: confirm
HttpClientTransaction -> SiteToSiteRestApiUtil: finishTransferFlowFiles
SiteToSiteRestApiUtil <-- SiteToSiteResource: 201202 CreatedAccepted: transactionId-2,returns serverChecksum
HttpClientTransaction -> HttpClientTransaction: validate server Checksum
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_CONFIRMED
A_component <-- HttpClientTransaction

'comment: complete
A_component -> HttpClientTransaction: complete
HttpClientTransaction -> SiteToSiteRestApiUtil: commitTransferFlowFiles
SiteToSiteRestApiUtil -> SiteToSiteResource: DELETE /site-to-site/input-ports/{portId}/transactions/{transactionId-2}
SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK
HttpClientTransaction <-- SiteToSiteRestApiUtil
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETED
A_component <-- HttpClientTransaction
 

...

The output-ports endpoint is used for receiving data from a remote target NiFi to source NiFi. When a transaction is created, a HTTP GET request is created towards initiated with a transactionId -1url. Then while there is more data packet to be received, the same HTTP GET request will be is used, to receive data in streaming manner. The GET request also returns the final transaction location, which is transactionId-2. When the client finishes consuming all data packets from input stream, and confirm() method is called, the client send a DELETE request with a Checksum calculated at client side to complete the transaction.

The complete() method doesn't do anything other than update state to TRANSACTION_COMPLETED.

PlantUML
actor A_component
actor HttpClient
actor HttpClientTransaction
actor SiteToSiteRestApiUtil
actor SiteToSiteResource

' comment: initialize
A_component -> HttpClient: createTransaction
HttpClient -> SiteToSiteRestApiUtil: initiateTransaction
SiteToSiteRestApiUtil -> SiteToSiteResource: POST /site-to-site/output-ports/{portId}/transactions
SiteToSiteRestApiUtil <-- SiteToSiteResource: transactionUrl-1, transactionProtocolVersion
HttpClient <-- SiteToSiteRestApiUtil
HttpClient -> HttpClientTransaction: new
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_STARTED
HttpClientTransaction -> SiteToSiteRestApiUtil: openConnectionForReceive
SiteToSiteRestApiUtil -> SiteToSiteResource: GET /site-to-site/output-ports/{portId}/transactions/{transactionId-1}
SiteToSiteRestApiUtil <-- SiteToSiteResource: 201 Created: transactionUrl-2202 Accepted
HttpClientTransaction <-- SiteToSiteRestApiUtil
HttpClient <-- HttpClientTransaction
A_component <-- HttpClient: Transaction

'comment: receive
alt while there is data packet to receive
    A_component -> HttpClientTransaction: receive
	HttpClientTransaction <-- SiteToSiteResource: read from inputstream
	HttpClientTransaction -> HttpClientTransaction: state = DATA_EXCHANGED
	A_component <-- HttpClientTransaction: data packet
end
 
'comment: confirm
A_component -> HttpClientTransaction: confirm
HttpClientTransaction -> SiteToSiteRestApiUtil: commitReceivingFlowFiles(checksum)
SiteToSiteRestApiUtil -> SiteToSiteResource: DELETE /site-to-site/output-ports/{portId}/transactions/{transactionId-2}
SiteToSiteResource -> SiteToSiteResource: validate client Checksum
SiteToSiteRestApiUtil <-- SiteToSiteResource: 200 OK
HttpClientTransaction <-- SiteToSiteRestApiUtil
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_CONFIRMED
A_component <-- HttpClientTransaction
 
'comment: complete
A_component -> HttpClientTransaction: complete
HttpClientTransaction -> HttpClientTransaction: state = TRANSACTION_COMPLETED
A_component <-- HttpClientTransaction
 

...

Scenario Type

{portId}/transactions

{portId}/transactions/{transactionId}{portId}/transactions/{transactionId}
Transaction Initiation Failure
  • Client sends a POST with handshake parameters
  • Server responds :
    • 400 Bad Request: if the protocol version is not specified, or not supported
    • 401 Unauthorized : If request is not authorized to the port
    • 403 Forbidden: if it is a NiFi Cluster Manager
    • 404 Not Found: if port is not found with portId
    • 503 Service unavailable : If the port is not running, not valid state, or port's destination is full
N/AN/A
Normal Case
  • Client sends POST with handshake parameters
  • Server creates a receiving transaction
  • Server responds:
    • 201 Created: The created transaction's URL is returned with 'Location' header
  • Client sends a POST to a receiving the transaction
  • Client streams data packets
  • Client status -> DATA_EXCHANGEDServer removes receiving transaction
  • Server consumes input stream to receive incoming data packets
  • Server creates a holding transactionServer responds 201 Created: The created transaction's URL is returned with 'Location' header, and responds 202 Accepted: a Checksum is returned with response body

-- confirm()

  • Client checks client Checksum and server Checksum
  • If Checksums are identical
  • Client state -> TRANSACTION_CONFIRMED

-- complete()

  • Client sends a DELETE to a holding the transaction with CONFIRM_TRANSACTION
  • Server commits its session
  • Server responds 200 OK with TRANSACTION_FINISHED
  • Client state -> TRANSACTION_COMPLETED
Normal Case - Destination becomes full(after above interactions)

(after above interactions)

(branched from above interactions)

  • Server responds 200 OK with TRANSACTION_FINISHED _BUT_DESTINATION_FULL if there's no available relationships
  • Client state -> TRANSACTION_COMPLETED
  • Client panalize peer. TODO: only implemented in Socket, EndpointConnectionPool.java
BAD_CHECKSUM(after above interactions)

(after above interactions)

-- confirm()

  • Client checks client Checksum and server Checksum
  • If Checksums are not identical
  • Client sends a DELETE to a holding the transaction with BAD_CHECKSUM
  • Client rollbacks its session
  • Server rollbacks its session
  • Server responds 200 OK with CANCEL_TRANSACTION
Cancel transaction(after above interactions)

(after above interactions)

-- cancel()

  • Client cancel the transaction for some reason (There's no component which cancels a transaction as of this writing, though)
  • TODO: not implemented yet
  • Client sends a DELETE to a holding the transaction with CANCEL_TRANSACTION
  • Client status -> TRANSACTION_CANCELED
Defunct transaction(after above interactions)(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
  • Transaction on client side might be rollbacked, however we can't be sure, since it's not working properly
  • Transaction on server should will be rollbacked when it gets expired

Expired transaction

(after above interactions)(after above interactions)

-- complete()

  • Client sends a DELETE to a holding the transaction
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
  • Transaction on client should be rollbacked
  • Transaction on server has already rollbacked
Transaction Initiation Failure(after above interactions)(after above interactions)

-- complete()

  • Client sends a DELETE to a holding the transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Transaction on server should will be rollbacked when it gets expired
Defunct transaction(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
N/A
Expired transaction(after above interactions)
  • Client sends a POST to a receiving the transaction
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
N/A
Transaction Initiation Failure(after above interactions)
  • Client sends a POST to a receiving the transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Server session hasn't started yet
N/A

...

Scenario Type

{portId}/transactions

{portId}/transactions/{transactionId}{portId}/transactions/{transactionId}
Transaction Initiation Failure
  • same implemetation with input-ports
N/AN/A
Normal Case
  • Client sends POST with handshake parameters
  • Server creates a receiving transaction
  • Server responds:201 Created:Server responds
      The created transaction's URL is returned with 'Location' header
  • Client sends a GET to a transferring transaction
  • Client status -> DATA_EXCHANGED
  • Server removes transferring transaction
  • Server creates a holding transaction
    • 201 Created: The created transaction's URL is returned with 'Location' header
  • Client sends a GET to the transaction
  • Client status -> DATA_EXCHANGED
  • Server responds 202 Accepted
  • Client consumes input stream to receive incoming data packets
  • Server writes data packets into output stream

-- confirm()

  • Client calculate Checksum of received data
  • Client sends a DELETE to a holding the transaction with Checksum
  • Server checks client Checksum and server Checksum
  • If checksums are identical
  • Server commits its session
  • Server responds 200 OK with TRANSACTION_FINISHED
  • Client state -> TRANSACTION_CONFIRMED

-- complete()

  • Client state -> TRANSACTION_COMPLETED
Normal Case - Destination becomes full(after above interactions)

(after above interactions)

(branched from above interactions)

  • TODO Destination Full case is not implemented yet. Another roundtrip is required??? but there's no client side impl sending TRANSACTION_FINISHED _BUT_DESTINATION_FULL. Besides that, server side doesn't have to penalize a peer, does it?
BAD_CHECKSUM(after above interactions)

(after above interactions)

-- confirm()

  • Client calculate Checksum of received data
  • Client sends a DELETE to a holding the transaction with Checksum
  • Server checks client Checksum and server Checksum
  • If Checksums are not identical
  • Server rollbacks its session
  • Server responds 400 Bad Request with BAD_CHECKSUM
  • Client throws raise an IOException
  • Client rollbacks its session
Cancel transaction(after above interactions)

(after above interactions)

-- cancel()

  • Client cancel the transaction for some reason (There's no component which cancels a transaction as of this writing, though)
  • TODO: not implemented yet
  • Client sends a DELETE to a holding the transaction with CANCEL_TRANSACTION
  • Client status -> TRANSACTION_CANCELED
Defunct transaction(after above interactions)(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
  • Transaction on client side might be rollbacked, however we can't be sure, since it's not working properly
  • Transaction on server should be rollbacked when it gets expired

Expired transaction

(after above interactions)(after above interactions)

-- confirm()

  • Client sends a DELETE to a holding the transaction
  • Server responds 404 Not Found, if the receiving transaction is already removed because it's expired
  • Transaction on client should be rollbacked
  • Transaction on server has already rollbacked
Transaction Initiation Failure(after above interactions)(after above interactions)

-- confirm()

  • Client sends a DELETE to a holding transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Transaction on server should will be rollbacked when it gets expired
Defunct transaction(after above interactions)
  • Client stops working
  • After passing certain amount of time, the receiving transaction will be removed from server side
N/A
Expired transaction(after above interactions)
  • Client sends a GET to a transferring the transaction
  • Server responds 404 Not Found, if the transfering transaction is already removed because it's expired
N/A
Transaction Initiation Failure(after above interactions)
  • Client sends a POST to a transferring the transaction
  • It's possible for the server to respond with 400, 401, 403, 404 or 503 if its state has changed from the previous request
  • Client rollbacks its session
  • Server session hasn't started yet
N/A

...