(This page is still under construction)
Streaming 2.0
Streaming protocol is re-designed from ground up in Apache Cassandra 2.0. Here is the overview of the protocol design and implementation.
Design goal
- Better control
- One API for all (bootstrap, move, bulkload, repair...)
- Sending/receiving data in the same session
- Better performance
- Pipelined stream
- Persistent connection per host
- Better reporting
- Better logging/tracing
- Event notification
- More metrics
Highlight
Stream Plan
Unlike the previous version, which performs sending and receiving data separately from each other and from the operation, Streaming 2.0 groups the related stream sessions under the same "Stream Plan".
Stream Plan for repair, bootstrap, bulkload, etc. |- Stream session with Endpoint 1 |- Stream receiving tasks |- Stream transfer tasks | |- Stream session with Endpoint 2 . . .
File transfer and messages
Streaming message and file exchange are pipelined on the same, persistent tcp connection.
Stream event support
Finer grained event notification. With JMX notification support, even external client can listen on event.
API
Public APIs
StreamPlan
- Builder for building streaming plan(what to transfer, what to request). Internally builds
StreamSession}}s to interact with the other nodes and associates them with {{StreamResultFuture
which asynchronously returns finalStreamState
.
StreamResultFuture
- Represents future result of
StreamPlan
execution. You can attachStreamEventHandler
to track the progress of streaming plan.
StreamState
- State of streaming execution. You can get snapshot of in-progress streaming from
StreamResultFuture#getCurrentState
or final state as the return value ofStreamResultFuture#get
.
StreamManager
- Manages all streaming progress
- Provides various metrics through JMX including notification
StreamEventHandler
- Listens on various stream events.
Basic API usage is as follows:
// Start building your streaming plan StreamPlan bulkloadPlan = new StreamPlan("Bulkload"); // Add transfer files tasks for each destination for (InetAddress remote : remoteTargets) bulkloadPlan.transferFiles(remote, ranges, sstables); // Execute your plan StreamResultFuture result = bulkloadPlan.execute(); try { // ... and wait for streaming completes result.get(); // all streaming success! } catch (Exception e) { // some stream failed }
Alternatively, StreamResultFuture
implements guava's ListenableFuture%3CStreamState%3E, So you can use FutureCallback%3CStreamState%3E to capture stream success and failure.
Futures.addCallback(result, new FutureCallback<StreamState>() { public void onSuccess(StreamState result) { // Yes, we did it! } public void onFailure(Throwable t) { // O_o something goes wrong } });
You can add event listener to StreamResultFuture
for stream events:
StreamResultFuture result = bulkloadPlan.execute(); result.addEventListener(new StreamEventHandler() { public void handleEvent(StreamEvent event) { // streaming completed } });
Internal APIs
StreamSession
- Group of stream tasks (INs and/or OUTs) per *destination*
StreamTask
- Represents each IN/OUT stream task
- Each task MUST belong to one Stream session
StreamReceiveTask
- execute method sends stream request to destination, wait for reply,
StreamTransferTask
ConnectionHandler
- Receives/sends streaming messages.
Stream session
Stream session handles the streaming part of one of more SSTables to and from a specific remote node. Both this node and the remote one will create a similar symmetrical StreamSession
. A streaming session has the following life-cycle:
- Connections Initialization
(a) A node (the initiator in the following) creates a newStreamSession
, initialize and then start. Starting will createConnectionHandler
that creates two connections to the remote node (the follower in the following) with whom to stream and sendStreamInit
message. The first connection will be the incoming connection and the second connection will be the outgoing for the initiator.
(b) Upon reception ofStreamInit
message, the follower creates its ownStreamSession
, initialize it if it still does not exist, and attaches connecting socket to itsConnectionHandler
.
(c) When the both incoming and outgoing connections are established,StreamSession
starts the streaming prepare phase.
2. Streaming preparation phase
(a) Sends a Prepare
message that includes what files/sections this node will stream to the follower and what the follower needs to stream back. If the initiator has nothing to receive from the follower, it goes directly to streaming phase. Otherwise, it waits for the follower's Prepare
message.
(b) Upon reception of the Prepare
message, the follower records which files/sections it will receive and send back its own Prepare
message with a summary of the files/sections that will be sent to the initiator. After having sent that message, the follower goes to streaming phase.
(c) When the initiator receives the follower's Prepare
message, it records which files/sections it will receive and then goes to streaming phase.
3. Streaming phase
(a) Sequentially sends a File
message. Each File
message consists of a File
message header that indicates which file is coming and then start streaming the content for that file. When all files are sent, the task is marked as complete.
(b) On the receiving side, an SSTable will be written for the incoming file and once the File
message is fully received, the file will be marked as completed and sends back Received
message. Once all files are received, those are added to the ColumnFamilyStore
and secondary indexes are built, and the task is marked as completed.
(c) If an I/O error occurs during the streaming, the node will send Retry
message of the file(up to max_streaming_retries
, default 3). On receiving Retry
message, the sender simply queue back new File
message for that file.
(d) When all transfer and receive tasks for the session are complete, move to the Completion phase.
4. Completion phase
(a) When the node has finished all transfer and receive task, it sends Complete
message. Stream session is considered complete when the node sends Complete
message and also receives Complete
message from the other side.
Events
StreamResultFuture
emits StreamEvent
at the following cases:
- Stream session prepared(
SESSION_PREPARED
)
Fired when stream session complete prepare receiving/sending files to tell event handler about number of files and total bytes receiving/sending. - Stream session complete(
SESSION_COMPLETE
)
Fired when session complete. - Stream progress(
FILE_PROGRESS
)
Fired when receiving/sending file progress.
To listen to StreamEvent
, implement StreamEventHandler
and register handler to StreamResultFuture
.
JMX support
JMX support is provided through StreamingManager
MBean. You can get list of streaming states of all currently running stream plans. It also provides JMX Notification support so that you can subscribe to stream events above through JMX interface.