One of the novel features of ApacheTM NiFiTM is its notion of Data Provenance, also known as Data Lineage or Chain of Custody. The term Data Provenance is used most often, because it can be thought of as a superset of Data Lineage. While lineage encapsulates the idea of how data was derived, Data Provenance also encapsulates the state of that data at each stage along the way.
The default implementation of the Data Provenance Repository is the PersistentProvenanceRepository class. At this time, there is no well-written documentation on how it all works, but we do have some notes that I have jotted down while explaining it to someone else. These should be formalized into some sort of paper or blog post at least. For now, though, here are the notes on how it works.
Design of Persistent Provenance Repository
- Very high throughput for updates
- Compressible to store as many events as possible
- Ability to query in real-time
- Embedded (NiFi is designed to work in many different environments; should depend on external service)
- Ability to retrieve records quickly in insertion order
- Ability to destroy or "age off" data as it reaches a certain age or as repository reaches storage limits
- To support high throughput, allow multiple "containers" to be specified. Allows writing to multiple disk partitions simultaneously for higher throughput.
- Each container supports multiple 'journals'.
- Allow multiple threads to update repository at the same time. When we update the repository, we round-robin between partitions.
- We have multiple journals per container because we are serializing the data inline. If we write to only a single journal per disk partition, then we do not utilize the disk well, as the serialization from object to bytes is fairly expensive.
- We encode the data ourselves. This allows us to change the schema as we need but also allows us to avoid the expense of converting the Provenance Event into an intermediate data structure, such as an Avro Record, just so that it can then be serialized to disk, and then doing the same thing when we deserialize.
- After some configurable period of time (30 seconds, by default), we take all journals and merge them into a single Provenance Event Log File. As this is occurring, we 'roll over' the journals so that other threads can update the repository at the same time.
- When we roll over the journals, we compress and index the data.
- We do not compress data as it is written, as doing so would slow down throughput.
- If we shutdown or lose power while writing, if we are writing to a compressed file, data may not be recoverable.
- We do not index data as it is written, as doing so would slow down throughput.
- As we compress the data, we keep track of the "compression block index." We write 1 MB of data to a GZIP Stream, and then we increment the compression block index. At the same time, we keep a mapping in a ".toc" (Table-of-Contents) file of compression block index to "compression block offset". This offset is the offset into the file where this block of events starts. This way, when we index the events, we are able to index the relevant fields as well as a pointer to the data. The pointer to the data is the Provenance Event Log File that the data is stored in, the ID of the event, and the compression block offset. As a result, if we have a Provenance Event Log File that is, say 1 GB when compressed, and we want a specific record from it, we can simply seek to the block offset (say 980,028,872) and then wrap the FileInputStream with a GZIPInputStream and start reading from there. We will have to read at most 1 MB of (decompressed) data. This allows us to access these records extremely quickly.
- After each record is written, it is then placed on a queue along with a pointer to the data. A separate thread will then pull this information from a queue and index the data in Lucene.
We do this so that we can allow multiple threads to index the data at once, as the indexing is very computationally expensive, and is actually the bottleneck of NiFi when processing
extremely large volumes of data records.
- When all data has been written to the merged Provenance Event Log File, compressed, and indexed, we delete the original journal files.
- As we index the data in Lucene, we "shard" the Lucene indices so that they do not grow beyond some configurable amount of space (default of 500 MB).
- Lucene stores the document ID is a 32-bit integer, rather than a 64-bit integer. As a result, it can contain a max of ~2 billion records.
- We are able to stripe the indexes themselves across the multiple disk partitions.
- When several threads are updating a particular index, access to that index is quite slow. This allows us to avoid touching that index if we don't need to.
- Filename of the directory where the Lucene index is stored is the timestamp of when the index is created. This allows us to know exactly which indices have to be searched when
querying for data over some specified time range.
Recovering After Restart
- We look for any journal files. If there is a matching Provenance Event Log File (correlation is based on filenames), then we know we were in the processof indexing and merging the journal files when we restarted, so we need to finish that job. We can't easily know where we left off, so we simply delete the Provenance Event Log File and delete any records in the indices for that event file. We then begin the process over for those journals.
- We determine the largest Event ID for any event in any journal, or the largest event ID for any Provenance Event Log File, if there are no journals.
- We then set our ID Generator to one greater than this value. This ensure that all of the events will always have a unique one-up number associated with them. This is important, so that when we have the Block Offset and Event ID we know which event we are looking for. Also allows us to easily access events sequentially.
Retrieving Events Sequentially
- The original implementation of the Provenance Event Repository was designed to simply store events and allow them to be retrieved by (sequential) ID later, so that the events could be published elsewhere.
- Because we merge the journals into a single Provenance Event Log File when we 'rollover', we are able to write the events sequentially.
- Provenance Event Log Files are named such that the filename mirrors the Event ID of the first event in the file.
- This means that we can request a specific Event ID and know exactly which file it is in, as we find the file whose name is largest without going over that Event ID.
- We then determine the Compression Block Offset needed for that Event ID. This is determined by looking at the Table-of-Contents File mentioned above.
- At this point, we know exactly which file contains the event and where in the file to seek to. We seek to this position, open a GZIPInputStream, and start reading.
- The API allows a developer to request a specific Event ID to start at and the number of events to return. This design allows us to read sequentially and return those events to the caller.
- In order to avoid running out of storage space, we have to eventually age this data off.
- User is able to specify size limits for storage capacity, as well as time limits.
- A background thread runs periodically, checking of the storage capacity has been reached. If so, it determines which data should be destroyed and mark it for destruction.
- Determination is based on destroying the oldest data first. We can easily determine which data is oldest based on the filename, since that represents an ever-increasing one-up number.
- When a file is marked for destruction, the size of the file is retained, so we mark as many files for destruction as needed in order to fall below a threshold of 90% of the max capacity.
- If the storage capacity has not been reached, we check if any Provenance Event Log File is older than the configured max time limit. If so, we mark it for destruction.
- We then delete any file that has been marked for destruction.
- After deleting the file, we update the indices to delete any event that points to that Provenance Event Log File.
- This can be improved in the future, as deleting from the Lucene Index is quite expensive. We can avoid ever deleting from a Lucene Index. Instead, we wait until we have destroyed all Provenance Event Files that are referenced by that Index (remember the indices are sharded so that an index only reaches 500 MB (by default) and then a new one is created) Once all event files referenced by an index have been destroyed, we simply delete the entire index from disk. If an index references a Provenance Event Log File that doesn't exist, we simply ignore the event when querying. This "wastes" up to 500 MB per partition but will result in better performance.