This Confluence has been LDAP enabled, if you are an ASF Committer, please use your LDAP Credentials to login. Any problems file an INFRA jira ticket please.

Child pages
  • Ignite Persistent Store - under the hood
Skip to end of metadata
Go to start of metadata

You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Next »

Following notation is used: words written in italics and wihout spacing mean class name without package name or method name, for example, GridCacheMapEntry

Table of Contents:

Ignite Persitent Store

File types

There are following file types used for persisting data: Cache pages or page store, Checkpoint markers, and WAL segments

  • Write Ahead Log (WAL) segments - constant size files (WAL work directory 0...9.wal;, WAL archive 0.wal…)
  • CP markers - small files for events of starting and finishing checkpoints (UUID-Begin.bin, UUID-End.bin)
  • Page store - constains cache parameters, page store for partitions and SQL indexes data (uses file per partition: cache-(cache_name)\part1,2,3.bin, and index.bin)

Folders Structure

Ignite with enabled persistence uses following folder stucture

2.3+Older versions (2.1 & 2.2)

Pst-subfolder name is same for all storage folders.

Name is selected on start, may be based on node consistentId

 Click here to expand...

Consistent ID may be configured using IgniteConfiguration or generated from local IPs set by default.

Subfolders Generation

Subfolder name is generated on start. By default new style naming is used, for example node00-e819f611-3fb9-4dbe-a3aa-1f6de4af5d02

  • where 'node' is constant prefix,
  • '00'-node index, it is incrementing counter of local nodes under same PST root folder,
  • remainig is string representation UUID, this UUID became node consistent ID.

 


Option 1. For case ignite is started for clear persistence storage root folder, this naming is used

Option 2. This option is used in case there is existing pst-subfolder with exact same name with compatible consistent ID (loca host IPs and ports list). If there is such folder, Ignite is started using this one, consistent ID is not changed.

Option 3. This option is applied in case there is preconfigured value from IgniteConfiguration

In case there is old style folder, but its name doesn't match with compatible consistent ID, following warning is generated.

There is other non-empty storage folder under storage base directory [work\db\127_0_0_1_49999, 299718 bytes, modified 10/04/2017 04:33 PM ]

There are two file locks used in folders selection.
First one is used to check if there is no up and running node which is using same directory.
This lock is placed in work/db/{pst-subfolder}/lock (work/db may be still customized by storage folder property)
Second lock is placed in storage root folder: work/db/lock. This lock is held for short time when new pst-subfolder is being created. This protects from concurrent folder intialisation by nodes which are starting simultaneouly.

Exact generation algorithm and code references:

 Click here to expand...

1) A starting node binds to a port and generates old-style compatible consistent ID (e.g. 127.0.0.1:47500) using DiscoverySpi.consistentId(). This method still returns ip:port-based identifier.

2) The node scans the work directory and checks if there is a folder matching the consistent ID. (e.g. work\db\127_0_0_1_49999). If such a folder exists, we start up with this ID (compatibility mode), and we get file lock to this folder. See PdsConsistentIdProcessor.prepareNewSettings

3) If there are no matching folders, but the directory is not empty, scan it for old-style consistent IDs. If there are old-style db folders, print out a warning (see warning text above), then switch to new style folder generation (step 4)

4) If there are existing new style folders, pick up the one with the smallest sequence number and try to lock the directory. Repeat until we succeed or until the list of new-style consistent IDs is empty. (e.g. work\db\node00-uuid, node01-uuid, etc).

5) If there are no more available new-style folders, generate a new one with next sequence number and random UUID as consistent ID. (e.g. work\db\node00-uuid, uuid overrides uuid in GridDiscoveryManager)

6) Use this consistent ID for the node startup (using value from GridKernalContext.pdsFolderResolver() and from PdsFolderSettings.consistentId())

There is a system property to disable new-style generation and using old-style consistent ID (IgniteSystemProperties.IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID)

 

Page store

Ignite Durable Memory is basis for all data structures. There is no cache state saved on heap now. 

To save cache state to disk we can dump all its pages to disk. First prototypes used this simple approach: stop all updates and save all pages.

Page store is the storage for all pages related to particual cache (cache's partitions and SQL indexes).

Using page identifier it is possible to map from page ID to file and to position in particular file:

pageId = ... || partition ID || page index (idx)
//pageId can be easily converted to file + offset in this file
offset = idx * pageSize

Partitions of each cache have corresponding file in page store directory (particular node may own not all partitions). 

 

Each cache has corresponding folder in page store (cache-(cache-name). And each owning (or backup) partition of this cache has its related file.

Cache page storage contains following files:

  • part-0.bin, part-1.bin, ..., part-1023.bin (shown as P1,P2,PN at picture) - Cache partition pages.
  • index.bin - index partition data, special partition with number 65535 is used for SQL indexes and saved to index.bin
  • cache_data.dat - special file with stored cache data (configuration) . StoredCacheData includes more data than CacheConfiguration, e.g. Query entities

Persistence

Checkpointing

Checkpointing can be defined as process of storing dirty pages from RAM on a disk, with results of consistent memory state is saved to disk. At the point of process end, page state is saved as it was for the time the process begins.

There are two approaches to implementation of checkpointing:

  • Sharp Checkpointing - if checkpoint is completed all data structures on disk are consistent, data is consistent in terms of references and transactions.
  • Fuzzy Checkpointing - means state on disk may require recovery itself

Approach implemented in Ignite - Sharp Checkpoint; F.C. - to be done in future releases.

To achieve consistency checkpoint read-write lock is used (see GridCacheDatabaseSharedManager#checkpointLock)

  • Cache Updates - holds read lock
  • Checkpointer - holds write lock for short time. Holding write lock means all state is consistent, updates are not possible. Usage of checkpoint lock allows to do sharp checkpoint

Under checkpoint write lock held we do the following:

      1. WAL marker is added: checkpoint (begin) record is added - CheckpointRecord - marks consistent state in WAL
      2. Collect pages were changed since last checkpoint

And then checkpoint write lock is released, updates and transactions can run.

Dirty pages is set, when page from non-dirty becomes dirty, it is added to this set.

Collection of pages (GridCacheDatabaseSharedManager.Checkpoint#cpPages) allows us to collect and then write pages which were changed since last checkpoint.

Checkpoint Pool

In parallel with process of writing pages to disk, some thread may want to update data in the page being written (or scheduled to being written).

For such case Checkpoint pool is used for pages being updated in parallel with write. This pool has limitation.

Copy on write technique is used. If there is modification required in page which is under checkpoint, we will create temporary copy of this page in checkpoint pool.


To perform write to a dirty page scheduled to be checkpointed followin is done:

  • write lock is acquired to page to protect from inconsistent changes,
  • then full copy of page is created in checkpoint pool (CP pool is latest chunk of durable memory segment),
  • actual data is written to regular chunk after copy is created,
  • and later copy of page from the CP pool will be written to disk.

If page was not involved into checkpoint initially, and is updated concurrenly with checkpointing process:

  •  it is updated directly in memory bypassing checkpoint pool.
  • actual data will be stored to disk during next checkpoint.

When dirty page flushed to disk, dirty flag is cleared. Every future write to such page (which was initially involved into checkpoint, but was flushed) does not require checkpoint pool usage, it is written dirrectly in segment.

Triggers

  • Percent of dirty pages is trigger for checkpointing (e.g. 75%).
  • Timeout is also trigger, do checkpoint every N seconds

WAL

We can’t control moment when node crashes. Let's suppose we have saved tree leafs, but didn’t save tree root (during pages allocation they may be reordered because allocation is multithread). In this case all updates will be lost.

In the same time we can’t translate each memory page update to disk each time - it is too slow.

Technique to solve this named write ahead loggingBefore doing actual update, we append planned change information into cyclic file named WAL log (operation name - WAL append/WAL log).

After crash we can read and replay WAL using already saved page set. We can restore to state, which was last committed state of crashed process. Restore is based on pages store + WAL.

Practically we can’t replay WAL from the beginning of times, Volume(HDD)<Volume(full WAL), and we need procedure to throw out oldest part of changes in WAL, and this is done during checkpointing.

Consistent state comes only from pair of WAL and page store.

Operation is acknowleged after operation was logged, and page(s) update was logged. Checkpoint will be started later by its triggers.

 

Crash Recovery

Local Crash Recovery

Crash Recovery can be

  • Local (most DB are able to do this)
  • and distributed (whole cluster state is restored).

WAL records for recovery

Crash recovery involves following records writtent in WAL, it may be of 2 main types Logical & Physical

 Logical records

    1. Operation description - which operation we want to do. Contains operation type (create, update, delete) and (Key, Value, Version)  - DataRecord
    2. Transactional record - this record is marker of begin, prepare, commit, and rollback transactions - (TxRecord
    3. Checkpoint record - marker of begin checkpointing (CheckpointRecord)

Structure of data record:

Data record includes list of entries (entry operations). Each operation has cache ID, operation type, key and value. Operation type can be 

  • CREATE - first put in cache, contains key and value.
  • UPDATE - put in case for existing key, contains key and value.
  • DELETE - remove key, contains key only, value is absent.

Update and create always contain full value value. In the same time several updates of the same key within transaction are merged into one latest update.

Physical records

    1. Full page snapshot - record is issued for first page update after successfull checkpointing. Record is logged when page state changes from 'clean' to 'dirty' state (PageSnapshot)
    2. Delta record - describes memory region change, page change. Subclass of PageDeltaRecord. Contains bytes changed in the page. e.g bytes 5-10 were changed to [...,]. Relatively small records for B+tree records

Page snapshots and related deltas are combined during WAL replay.

For particular cache entry update we log records in following order:

  1. logical record with change planned - DataRecord with several DataEntry (ies)
  2. page record:
    1. option: page changed by this update was initially clean, full page is loged - PageSnapshot,
    2. option: page was already modified, delta record is issued - PageDeltaRecord

Planned future optimisation - refer data modified from PageDeltaRecord to logical record. Will allow to not store byte updates twice. There is file WAL pointer, pointer to record from the beginning of time. This refreence may be used.

 

WAL structure

 

WAL file segments and rotation structure

 

 

See also WAL history size section below


Local Recovery Process

Let’s assume node start process is running with existent files.

  1. We need to check if page store is consistent.
  2. Or we need to find out if crash was while Checkpoint (CP) was running

Ignite manages 2 types of CP markers on disk (standalone files, includes timestamp and WAL pointer):

  • CP begin
  • CP end

If we observe only CP begin and there is no CP end marker that means CP not finished; we have not consistent page store.

No checkpoint process 

For crash at the moment when there was no checkpoint process running restore is trivial, only logical record are applied.

Physical records (page snapshots and delta records) are ignored because page store is consistent.

Middle of checkpoint

Let’s suppose crash occurred at the middle of checkpoint. In that case restore process will discover markers for 1 CP1 and 2 start and CP 1 end.

Restore is split to 2 stages:

1st stage: Starting from previous completed checkpoint record CP1 till record of CP2 start (incompleted) we apply all physical records and ignore logical.

  • In example CP2 was started but not finished. At the time of crash CP2 process was writting pages changed before CP2 was started: in this example: page(s) corresponding to PS1 & Delta 1 physical record.
  • Write of page content to page store may not incompleted (e.g. broken at the middle of page). To restore potentially corrupted page store we apply page change records. As result of updating same pages at the same place in page store, we got consistent page store. 
  • Page Snapshot records required to avoid double apply of data from delta records.

2nd stage: Starting from marker of incomplete CP2 we apply only logical records until end of WAL is reached.


When replay is finished CP2 end marker will be added.

If transaction begin record has no corresponding end, tx change is not applied. 

Summary, limitations and performance 

Limitations

Because CP are consistent we can’t start next CP until previous is not completed.

There is possible next situation:

  • updates coming fast from worker threads
  • CP pool (for copy on writes) may become full with new changes originated

For that case we will block new updates and wait running for CP to finish.

To avoid such scenario:

  • increase frequency of checkpoints (to minimize amount of data to be saved in each CP)
  • increase CP buffer size

WAL and page store may be saved to different devices to avoid its mutual influence.

Case if same records are updated many times may generate load to WAL and no significant load to page store.

To provide recovery guarantees each write (log()) to WAL should:

  • call write() itself.
  • but also require fsync (force buffers to be flushed by OS to the real device).

fsync is expensive operation. There is optimisation for case updates coming faster than disk write, fsyncDelayNanos (1ns-1ms, 1ns by default) delay is used. This delay is used to park threads to accumulate more than one fsync requests.

Future optimisation: standalone thread will be responsible to write data to disk. Worker threads will do all preparation and transfer buffer to write.

See also WAL history size section below.

WAL mode

There several levels of guarantees (WALMode)

 

 
Implementation
Warranties
DEFAULTfsync() on each commitAny crashes (OS and process crash)
LOG_ONY

write() on commit

Synchronisation is responsibility of OS

Kill process, but no OS fail
BACKGROUND

do nothing on commit

(records are accumulated in memory)

write() on timeout

kill -9 may cause loss of several latest updates

 

But there is several nodes containing same data and there is possible to restore data from other nodes.

Distributed Recovery

Partition update counter. This mechanism was already used in continuous queries.

  • Partition update counter is associated with partition
  • Each update causes increment of partition update counter.

Each update (counter) is replicated to backup. If counter equal on primary and backup means replication is finished.

Partition update counter is saved with update recods in WAL.

Node Join (with data from persitence)

Consider partition on joining node was is owning state, update counter = 50. Existing nodes has update counter = 150

Node join causes partition map exchange, update counter is sent with other partition data. (Joining node will have new ID and from the point of view of dicsovery this node is a new node.)

Coordinator observes older partition state and forces partition to moving state. Moving force is required to setup uploading newer data.

Rebalance of fresh data to joined node now may be run in 2 modes:

  • There is WAL on primary node. WAL includes checkpoint marker with partition update cntr = 45. 
    • We can send only WAL logical update records to backup
  • If counter in WAL is too big, e.g. 200, we don’t have delta (can't sent WAL recods) 
    • joined node will have to clear partition data. 
    • Partition state is set to renting state
    • When clean up finished partition goes to moving state.
    • We can’t use delta updates because there is possible problem with keys deleted early. Can get stale key if we send only delta of changes.

Possible future optimisation: for full update we may send page store file over network.

Order of nodes join is not relevant, there is possible situation that oldest node has older partition state, but joining node has higher partition counter. In this case rebalancing will be triggered by coordinator. Rebalancing will be performed from the newly joined node to existing one (note this behaviour may be changed under IEP-4 Baseline topology for caches)

WAL history size

In corner case we need to store WAL only for 1 checkpoint in past for successful recovery (PersistentStoreConfiguration#walHistSize )

We can’t delete WAL segments considering only history size in bytes or segments. It is possible to replay WAL only starting from checkpoint marker.

WAL history size is measured in number of checkpoint.

Assuming that checkpoints are triggered mostly by timeout we can estimate possible downtime after which node may be rebalanced using delta logical WAL records.

By default WAL history size is 20 to increase probability that rebalancing can be done using logical deltas from WAL.

Estimating disk space

 

WAL Work max used size: walSegmentSize * walSegments = 640Mb (default)

in case Default WAL mode - this size is used always,

in case other modes best case is 1 segment * walSegmentSize

WAL Work+WAL Archive max size may be estimated by

  1. average load or
  2. by maximum size.

1st way is applicable if checkpoints are triggered mostly by timer trigger.
Wal size = 2*Average load(bytes/sec) * trigger interval (sec) * walHistSize (number of checkpoints)
Where 2 multiplier coming from physical & logical WAL Records.

2nd way: Checkpoint is triggered by segments max dirty pages percent. Use persisted data regions max sizes:
sum(Max configured DataRegionConfiguration.maxSize) * 75% - est. maximum data volume to be writen on 1 checkpoint.
Overall WAL size (before archiving) = 2* est. data volume * walHistSize = 1,5 * sum(DataRegionConfiguration.maxSize) * walHistSize

Note applying WAL compressor may significiantly reduce archive size.



  • No labels