This is a high level overview of how Flume v0.9.4's end-to-end mode works.

End-to-end Ack, Ack on sink close (hdfs <= 0.20 sync semantics not sane)

  • AckChecksumInjector records a mapping from current rollfile tag to potentially many ack tags.
  • On close of rollfile, transmit all rolltags to master so agent can check ack.

The write-ahead-logging end-to-end acking agent sink: (each line a thread)

-> roller -> ackInject -> write ahead log (disk) [writeQ->loggedQ]
                       -> write ahead log (cache/disk) [->sendingQ] -> ackRegisterer -> thrift [->sentQ]
                                                                        \-> pendingAckQ -> ackCheckQ [->deleted]
                                                                             \-> Master's Ack Manager

New events are sent to the roller. Each period/batch has a rolltag. The ack injector takes a checksum and wraps the batch with a ack begin and ack end event. This is written to the write-ahead log. Before the batch is closed the batch is in the WRITING state (in the writeQ). When roll period expires files are rotated and the batch goes into LOGGED state (loggedQ).

A separate thread gets notified that there is a new batch enters the LOGGED state. It will grab the batch and move it into the SENDING state (sendingQ). This passes through an ackRegisterer and is sent downstream (thrift currently). When a batch is completely sent, it is moved to the sentQ and put into SENT state. Ideally, this batch ends with the proper end ack checksum, because it went through the ackRegisterer, that batch's tag is sent to the ack pending queue.

Yet another thread monitors the ack pending queue. When the acks have reached a certain age, this thread contacts the master to see if the ack has been registered there. If it is, the agent can safely delete the wal file (transitioning the batch from SENT state to DONE state). If not, the batch's ack is rescheduled for a retry. If the batch isn't acked after a specified period, the batch is moved back to the LOGGED state and eventually retried and registered.

If there is a failure, batches can be in in any of the queues. Recovery (or imports) takes all the batches in any of the states and puts them into the loggedQ in LOGGED state. This allows the recovered batches to reregister their interest in their ack tags when they are sent downstream.

Interaction with Retry.

As groups of events are written they have an associated tag and timestamp. If flume.agent.logdir.retransmit millis is exceeded, a retry call will be made on the event group tag. This will trigger a state transition of the data.

This is updated as of FLUME-746 – should be in 0.9.5

State when attempting to retry

State after retry




Unexpected condition



Unexepcted condition



Do nothing. Already queued for (re-)sending



Do nothing. Sending already in progress



Demote sent data to LOGGED so it will eventually be resent



Do nothing. Data s ack no need to resent



Do nothing. Already in bad state, keep it there

The collector side.

ackChecker -> roller -> dfsClosedNotifier /-> dfswriter -> hdfs.
  \-> rolltag acktag map --------------\-> ackDone -> Master's Ack Manager

Events are received by a collectorSource (thriftSource). These events are passed through an ackChecker, which consumes ack begin and ack end messages and calculates the checksum of messages between them. If the checksum is good, the ack tag is registered to a multimap to associate a rolltag with a set of acktags. There can be many acktags associated with a rolltag (but only one rolltag associated acktag). Ack batches may start in one roll group but is associated with the rollgroup in which the ack end event is sent. Rolltags and acktags are independent of each other.

Payload events go to a roller that is responsible for rolling events into different files on hdfs. When a particular hdfs file is closed, the multimap is checked and any acktags associated with the particular roll file is sent to the master. Since the hdfs close has happened, we know the data is safe. Thus when the ack to the agent eventually arrives, it can delete its log.

Interaction with logical nodes

In the initial version there was one directory for the logs associated with the node. With the addition of logical nodes, we isolate the wal/dfo of the differet nodes but having them write to different subdirectories within the logging dir.

So if the logDir base is /tmp/foo, and there are two logical nodes bar and baz, the WAL logs should live in

And the DFO stuff should be in

For the default logical node that corresponds to a physcal node, it should be

Caveats: We cannot have more than one WAL/DFO in a single logical node. This will cause a conflict. There currently is not a mechanism that prevents this from happening.

  • No labels