Data storage in 2.x versions of Ignite is tightly coupled to offheap-based implementations called Page Memory used in both in-memory and persistent modes. These implementations work well for some workloads but aren't optimal for others (e.g. in persistent mode Page Memory suffers from write amplification problem and both implementations use B+Tree as a primary key index which has its limitations compared to hash index).

In Ignite 3 we want to provide more flexibility for end users to choose the best storage implementation for their use cases. To achieve this we need to define a minimal API and hide all implementation details about particular storage implementations behind it.


Data Storage API

As a starting point I suggest to define the following interfaces (of course they'll evolve when other components start integrating them):

Storage API
/** Interface providing methods to read, remove and update keys in storage. */
public interface Storage {
	/** Reads a DataRow for a given Key. */
	public DataRow read(Key key);

	/** Removes DataRow associated with a given Key. */
	public void remove(Key key);

	/** Executes an update with custom logic implemented by UpdateClosure interface. */
	public update(Key key, UpdateClosure clo);

	/** Obtains Iterator over some DataRows in storage. */
	public Iterator<DataRow> iterator(/* parameters */).

Proposed API is based on IgniteCacheOffheapManager interface from Ignite 2.x which is significantly simplified and cleaned up from Ignite 2.x concepts like partitions, caches and data stores.
However it keeps useful ideas like KeyCacheObject (in the form of Key class, its shape and responsibilities are subject of discussion and clarification), CacheDataRow (in the form of DataRow) and InvokeClosure needed to effectively implement read-modify-write pattern (in the form of UpdateClosure).

The first time this interface looks enough for needs of existing components.

Implementation of transactions needs a LockManager enabling to lock and unlock particular keys in Storage. It may look as simple as this but will be clarified when transaction protocol is defined:

LockManager API
/** Interface enabling obtaining locks on Keys in Storage. */
public interface LockManager {
	/** Locks given Key. */
	public void lock(Key key);

	/** Unlocks given Key. */
	public void unlock(Key key);

Index Storage API

public interface SortedInternalStore {
    /** Exclude lower bound. */
    byte GREATER = 0;

    /** Include lower bound. */
    byte GREATER_OR_EQUAL = 1;

    /** Exclude upper bound. */
    byte LESS = 0;

    /** Include upper bound. */
    byte LESS_OR_EQUAL = 1 << 1;

     * Update row at the index if need:
     * <ul>
     *     <li>put operation if the {@code oldR} is {@code null},</li>
     *     <li>remove operation if the {@code newR} is {@code null},</li>
     *     <li>update operation otherwise.</li>
     * </ul>
    void update(Row oldR, Row newR);

     * Return rows between lower and upper bounds.
     * Fill results rows by fields specified at the projection set.
     * @param low Lower bound of the scan.
     * @param up Lower bound of the scan.
     * @param scanBoundMask Scan bound mask (specify how to work with rows equals to the bounds: include or exclude).
     * @param proj Set of the columns IDs to fill results rows.
    Cursor<Row> scan(Row low, Row up, byte scanBoundMask, BitSet proj);

Modules structure

As we aim to have more than one option of storage we need to keep storage api separated from implementation, so we'll have one module containing api and a separate module for each specific implementation.

Possible implementations

PageMemory code base migration (persistent)

Page Memory from Ignite 2.x is a good candidate as a persistent storage implementation in 3.0 although some refactoring and modifications are needed during migration.

Getting rid of Write-Ahead Logging (WAL)

Ignite 2.x uses WAL logging to maintain data consistency on recovery, since in 3.0 uses a Raft protocol, it is proposed to get rid of WAL logging (logical and physical records) using the following ideas.

Replacing logical records with the replication log

Ignite 3.x uses log-based replication protocol (IEP-61) to ensure data consistency on different nodes in the cluster. This very same log is used as a logical WAL for local state machine recovery, in this way we can get rid of WAL logging of logical records.

The Raft log represents a key-value storage for which key is always a contiguously growing unbounded integer number. In practice, it is sufficient to limit the key by a 64-bit unsigned integer. The key range consists of two segments: committed entries (the larger part of the log) cannot be changed and are immutable, and not-yet-committed entries that can be truncated and be overwritten. The non-committed entries are rarely overwritten in practice. Additionally, the Raft log can be compacted: entries with key smaller than lower bound are thrown away as they were made durable in the state machine and are no longer needed.

The replication log storage is based on WiscKey approach of log-structured storages. Multiple Raft group logs can be stored in a single storage, however, it is not neccessary to have a single storage for all Raft groups. The storage and WAL of Raft log are combined and use immutable append-only segments containing log indices and state machine commands. Segment header may contain a dictionary to reduce the size of Raft group IDs written in the segment. Each log entry contains the log index, Raft group ID and command payload (the payload itself also contains the log entry term which is essential to Raft protocol). Schematically, the segment structure can be represented as follows:

|                |             Entry 1            |             Entry 2            | ... |
| Segment header | Log index | Group ID | Payload | Log index | Group ID | Payload | ... |

New entries are always written to the end of the segment, even if it is an overwrite entry with the log index which was already written to the same Raft group. The Raft log maintains an index of log entries for each Raft log group which is represented as a simple array of entry offsets from the beginning of the file. The array is complemented with the base index of the first entry in the segment. The in-memory index is flushed on disk (checkpointed) only when all entries in the segment are committed. Schematically, the index structure may be represented as follows:

|                |                Group 1 Index                 |                Group 2 Index                 | ... |
|  Index header  | Len | Index Base | Offset 1 | Offset 2 | ... | Len | Index Base | Offset 1 | Offset 2 | ... | ... | 

Upon recovery, the Raft log component reads all log entries from non-checkpointed segments and repopulates the in-memory index which will be later checkpointed.

Replacing physical records with the incremental checkpoints

Similar to Ignite 2.x native persistence, all data structures are implemented over logical pages. This include data pages, B+Tree, reuse and free lists, etc. Each Raft log command represents a single atomic change to the state machine. When a command is applied, it changes some pages in the state machine data structures, marking them dirtly. Unlike Ignite 2.x, no physical or logical records are written to WAL when these changes are applied. As a result, after applying a set of changes, certain pages will be marked dirty and need to be checkpointed. To allow for effecient checkpointing, Ignite 3.x splits storage files into main, containing only contiguous range of pages, and auxiliary files, containing dirty pages from consequtive checkpoints. Pages in auxiliary files are not necessarily contiguous and may have gaps. The files are ordered by the order in which they were written, forming a structure similar to an LSM-tree: when a page is read from disk, it is first looked up in the most recent auxiliary file, then in the second auxiliary file, and so on, until the main storage file is reached. To allow for effecient search in auxiliary files, a small reverse index is stored with each file that maps page IDs to offsets in the file. The number of auxiliary files should be kept small enough so that the full inverse index is kept in-memory; however, if the merge process does not keep up with the load, the indexes can be always evicted from memory and read on-demand.

Schematically, the structure of the main and auxiliary checkpoint files may be represented as follows:

Main storage file:

| Page 1 | Page 2 | Page 3 | ... |

Auxiliary storage file:

|                       Offsets Index                       |        Pages Data       |
| Index len | Page P1 ID, Offset | Page P2 ID, Offset | ... | Page P1 | Page P2 | ... | 

If a process crashes when an auxiliary file is not completely written, the file is discarded upon recovery and Raft log commands are replayed again, forming another set of dirty pages that should be checkpointed.

As the number of auxiliary files grow, they can be merged back to the main storage file in the same order they were originally written. When an auxiliary file is fully merged to the main storage file, it can be safely removed provided that there are no threads intending to read from that file. If a process crashes in the middle of the merge, the file can be safely merged again (the original pages will not be used anyway).

NOTE: It will be necessary to discuss the possibility of enabling WAL logging of physical records in case of CorruptedDataStructureException's for debugging.


RocksDB [1] is an LSM-based K-V database that should provide better performance on write-intensive workloads and should be easy to integrate for quick start.

In-memory storages

For in-memory storages we can consider both heap and off-heap solutions, including porting Page Memory without persistence support.

Risks and Assumptions


Discussion Links


Reference Links


  • No labels