NiFi employs a Write-Ahead Log to keep track of changes to FlowFiles (i.e., a data record) as they flow through the system.  This Write-Ahead Log keeps track of changes to the FlowFiles themselves, such as the FlowFile's attributes (key/value pairs that make up metadata), as well as their state, such as which Connection/Queue the FlowFile belongs in.

Here, we will describe the implementation details and algorithm that are used in order to implement this capability. Currently, these are simply notes that I created in order to help explain the code to someone else, and they serve very well to augment the code. On their own, they may not provide a great, holistic explanation of the class. This should eventually be turned into an architectural type of paper or at least a blog post. For now, though, I will post the notes that I have, so that they can help others to understand the logic as well.

Definition of Terms

  • SerDe: interface to Serialize/Deserialize records and updates to records
  • TransactionID Generator: An AtomicLong used to indicate a Transaction ID when writing to edit log or snapshot for each Transaction

Writing to the Write-Ahead Log

  1. Verify records have been restored ('restored flag' set to true). If not, throw IllegalStateException
  2. Obtain repo shared lock (read lock)
  3. Claim a partition that is not currently in use
    1. Increment an AtomicLong and mod by the number of partitions -> partitionIndex
    2. Try to claim (obtain a write lock on) partition[partitionIndex]. If not successful, go back to 2a.
  4. If no output stream exists for edit log, create output stream and write SerDe class name and version
  5. Obtain Transaction ID (increment AtomicLong) and write to edit log
  6. Write update to partition
    1. Serialize update to record
    2. If more records, write TransactionContinue marker; go to a, else go to c
    3. Write TransactionCommit marker
  7. Update global record map to hold most current version of record
  8. Release claim on partition
  9. Release shared lock

Checkpointing the Write-Ahead Log

  1. Obtain mutually exclusive lock (write lock) so that no partition can be updated
  2. Create .partial file
  3. Write SerDe class name and version
  4. Write current max Transaction ID
  5. Write number of records in global record map
  6. For each record, serialize record
  7. Close output stream for .partial file
  8. Delete current 'snapshot' file
  9. Rename .partial file to 'snapshot'
  10. Clear all Partitions/Edit Logs: For each partition:
    1. Close output stream to file
    2. Create new output stream to file, indicating Truncate, rather than append.
    3. Write SerDe class name and version
  11. Release write lock

Restoring from the Write-Ahead Log

  1. Obtain mutually exclusive lock (write lock) so that no partition can be updated
  2. Restore from snapshot
    1. Examine snapshot and .partial file
      1. If neither file exists, no snapshot to restore from. Move to 4.
      2. If only snapshot file exists, then we went down while not creating snapshot.
      3. If .partial file exists and snapshot exists, assume crash while creating snapshot. Delete .partial file.
      4. If only .partial file exists, we went down after creating .partial file and deleting snapshot but before renaming .partial to snapshot. Rename .partial to snaphsot
    2. Open InputStream to snapshot file
    3. Read SerDe class name and version
    4. Read max Transaction ID
    5. Read number of records in snapshot
    6. For each record in snapshot, deserialize record and update global record map
    7. Update TransactionID Generator (atomic long) by setting to the max Transaction ID read from snapshot + 1
  3. For each Partition:
    1. read Transaction ID.
    2. If EOF, done restoring partition.
    3. If Transaction ID less than value of TransactionID Generator, read the data for this transaction and discard. Go to a.
  4. Determine which Partition read the smallest Transaction ID that is greater than or equal to TransactionID Generator.
  5. Restore Transaction from Partition (call SerDe#deserializeRecord, including the Version of the SerDe that was used to write the file. This way, if implementation changes, we can still restore the data).
  6. Check if restore was successful
    1. If successful, update global record map to reflect new state of records that were restored. 
      Update TransactionID Generator to 1 + TransactionID of Transaction that was restored in 5. Read next Transaction ID from edit log.
    2. If not successful (Unexpected EOF), discard transaction and note that EOF was encountered.
  7. Repeat 4-6 until all Partitions have been restored.
  8. If any Partition indicated an Unexpected EOF, we cannot write to that partition until this has been corrected, so perform Checkpoint before
    allowing any updates. This will cause the Edit Logs to be deleted. If unable to checkpoint, throw IOException, indicating that Restoration failed.
    Ensure that write lock is released!
  9. For each partition, open output stream for append.
  10. Set 'restored' flag to true
  11. Release write lock

 

  • No labels