Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Wave store TODOs
July 2011 - WORK IN PROGRESS

This page describes improvements that can be made to the Wave in a Box (WIAB) wave store to scale it to large numbers of waves and simultaneous users and to build out the WIAB wave server to support scalable and reliable indexing, search, federation, and APIs.

1. BACKGROUND AND PROBLEMS

Presently (July 2011), the WIAB wave server stores wavelet deltas in (file based) persistent storage immediately after it applies them. All wavelets are kept in memory. On startup, the wave server reads all wavelet deltas from disk, keeps them all in memory, and constructs the state of every wavelet from its deltas. The wave server maintains no real index, only an in-memory cache of lists of wave views for local users and groups. Searches are performed by traversing these lists for all wavelets accessible to the given user.

The design of the current file based wave store is described here:

http://www.waveprotocol.org/protocol/design-proposals/wave-store-design-for-wave-in-a-box

The current implementation is very simplistic and inefficient in most ways, preventing WIAB from scaling beyond thousands of waves and dozens of users. The biggest problems are:
P1. Wavelets are reconstructed from scratch from their entire delta history when loaded from disk. This is expensive and causes excessive latency when loading a wave with a long edit history from disk.
P2. The wave server keeps the entire wavelet delta history in memory for all the wavelets it has loaded into memory. This makes the memory footprint for waves with long edit histories excessive.
P3. All wavelets are kept in memory in the wave server, and the wave server loads all wavelets from disk on startup. Obviously, the memory footprint and startup latency of this solution doesn't scale to large number of waves.
P4. The search implementation is tightly coupled with the in-memory wave store, in particular, user searches contend for the wavelet lock in the wave store. This makes searches slow and expensive and doesn't scale beyond dozens of simultaneous users.

2. GOALS

The goal is to build out WIAB's wave store and wave server to satisfy the following properties:
G1. The cost (in terms of storage operations and memory footprint) and latency to access a wave is proportional to its current size.
G2. The total memory footprint on the wave server is a modest constant factor larger than the "working set", measured as the aggregate current size of the waves which were recently modified.
G3. A scalable and modular wave server architecture, interconnected by a wave bus API that supports the construction of reliable indexing, search, federation, and API subsystems, suitably decoupled from the wave store.
The remainder of this page describes solutions to problems P1-P4 with an aim towards goals G1-G3.

3. WAVELETS WITH LONG EDIT HISTORIES

Problems P1 and P2 make wavelets with long edit histories unnecessarily expensive: they are slow and expensive to load into memory and their memory footprint is excessive. These can be solved in isolation in the wave store without repercussions for the wave bus and other subsystems.

3.1 DELTA HISTORY EVICTION
The excessive footprint can be avoided if the wave server evicts the oldest deltas from memory, that is, it only keeps the most recent deltas in memory and loads older deltas from disk only upon request. Since we don't want to perform any disk operations synchronously under the wavelet lock in the wave store, we need to make all the methods that access delta history asynchronous. Specifically, the following methods need to be made asynchronous:
box.server.waveserver.WaveletContainer#requestHistory()
box.server.waveserver.WaveletContainer#requestTransformedHistory()
box.server.waveserver.WaveletProvider#getHistory()
which requires changes to all the places they are called, of course.
Implemented partially. Deltas are evicted, but without making the methods asynchronous

3.2 PERSISTENT WAVELET SNAPSHOTS

The cost and latency of loading wavelets into memory can be reduced by storing wavelet snapshots in persistent storage. Appropriate care is needed to ensure consistency with stored deltas. The frequency of writing the current snapshot to disk will need to be calibrated for best performance and resource economy.1

Here is a simple design for adding persistent consistent snapshots to the file based wave store implementation:
Give the snapshot file a fixed name, "snapshot", so the file can be opened immediately without a prior directory lookup to determine the file name.
In the snapshot file include the snapshot version number and the index into the deltas file for the delta that ends at that version. This makes it easy to load only the deltas after the snapshot when a wavelet is loaded from disk.
A wavelet is loaded from disk with the following steps:
Try to read the wavelet snapshot from the "snapshot" file.
If the file doesn't exist (or the read fails for any other reason), construct the wavelet in the old fashion from its entire delta history.
Otherwise read only the deltas from the wavelet version onwards and construct the wavelet from the snapshot and any later deltas.
The wave server only writes out a snapshot for a version of the wavelet after all the deltas up to that version have been written successfully to disk.
Some heuristic is used to determine when to write a new snapshot. It should strike a good balance between the cost of writing frequent snapshots and the cost of applying many deltas to an old snapshot, if new snapshots are written infrequently. The heuristic could say that a new snapshot is written whenever the aggregate size of the deltas since the last snapshot exceeds the size of a new2 snapshot.
The snapshot is first written into a temporary file. Then, when it is successfully written in its entirety, the temporary file is renamed to "snapshot", atomically deleting any pre-existing snapshot of an earlier version of the wavelet.
On startup, the run-server.sh script can delete any dangling temporary files (not fully written and renamed when the last run of the wave server terminated).
The snapshot file is always optional. It will eventually be created if the delta history grows long enough to make a snapshot file worthwhile. Because it is optional, the wave server will automatically start creating snapshot files when this functionality is implemented, so there's no need to run a special migration logic to populate the wave store with snapshot files. Moreover, if a snapshot file is corrupted, the operator of the wave server can overcome the corruption by deleting the corrupt snapshot and restarting the wave server.

4. PERSISTENT INDEX

Similarly to the way the delta history of a wavelet can be paged out and paged in, it is useful to load wavelets into memory and unload them from memory in accordance to how and when they are accessed.2 This solves problem P3, namely the lack of scalability of keeping all wavelets resident in memory and loading them all at wave server startup.

The logic to page in a wavelet on demand is already implemented.

Wavelets can be paged out in different ways. A crude solution would be to restart the wave server when too many wavelets are paged in. More sophisticated solutions, which selectively page out wavelets at runtime to make room for others, need to take care to ensure that no part of the wave store and no users of the wave store maintain references to obsolete WaveletContainer and WaveletState objects for paged out wavelets.

But there's an important reason why, currently, all wavelets need to be kept in memory: The search implementation needs to traverse all wavelets to perform a query. To page out wavelets, we need a proper index which can decide if a query matches a wavelet. Then we won't need to consult the wavelet contents directly and, thus, won't need to have the wavelet in memory. Furthermore, in order to scale to large numbers of wavelets, we cannot build the index at wave server startup, as we cannot afford to consult every wavelet, and we can't hold the index in its entirety in memory, as the size of the index will be proportional to the size of the wave store, more or less. Therefore the index needs to be stored on disk (in persistent storage) and it needs to be possible to evaluate queries by piecemeal disk access to the index.

When it comes to indexing stored wavelets, it's a real drawback that we only have a file based implementation of the wave store. If the persistent wave store was backed by a good database, we could make use the database's indexing facilities. Alternatively, we could integrate an off-the-shelf indexing system such as Apache Lucene, indepedent of
the underlying wave store implementation. Regardless, in keeping with the existing file based wave store, in section 4.4 we consider how a primitive file based index can be implemented, as an illustration.

4.1 GOOGLE WAVE'S SEARCH AND INDEXING

First, let us consider Google Wave's search and indexing solution:
Users can only search waves that they share, either waves that they participate in or follow or waves with a group that they are members of.
Google Wave exploits this structure to organize its persistent index as an index per user and per group. Within each user/group index, there's a posting list per token, sorted by last-modified time (LMT).
This solution makes searches involving long posting lists relatively efficient: long posting lists are already intersected with the relevant user/group and only a prefix of the posting list needs to be accessed to deliver the first page of results.
All search results are ordered by LMT, a useful and conventional sort order in communication tools.
The indexer maintains a cache of index changes since it last modified a user/group persistent index and occasionally rewrites the persistent index, updating the sort order according to LMT changes.
The indexer is a separate process (running on a separate machine) from the wave server. It is connected to the wave server by a "remote wave bus", that is, the indexer maintains a network connection to the wave server and receives a message for every update to a wave in the wave store.
To recover any messages lost due to network or server outages, the wave server maintains a commit log that logs all persisted wave updates and broadcasts the commit log on the wave bus along with the actual wave updates. The indexer maintains a pointer into the commit log in persistent storage tracking its progress processing the logged updates and recording them all in the persistent user/group indexes. If the indexer restarts, it contacts the wave server to read the commit log contents from the pointer onwards and it catches up on any missed, persisted wave updates while indexing new wave updates on the wave bus.

4.2 PERSISTENT INDEX FOR WIAB

The following is a simple persistent user/group index design for WIAB.

The index indexes a set of waves, each with a set of tokens. If the index is for the user Bob, then the waves are those that Bob participates in or follows and the set of tokens for a given wave consists of the query terms that appear in Bob's wave view, e.g, "with:bob@example.com" and "creator:alice@example.com".

The index design here doesn't support prefix matching of tokens, so if it should be possible to find a wave with Bob as a participant by searching for either "with:bob@example.com" or "with:bob" or any prefix thereof, then the set of tokens for the wave must include all these tokens. The index will support full text search if there's a token for every word in the contents of the wave view.

The index maintains a record for each wave in the index with its wave id and LMT and possibly some metadata about the wave. (If the record includes blip count, unread count, first three participants, and a snippet, then a search result digest can be generated from this record without having to load the wave when this wave is found during search.)

4.3 MONGODB IMPLEMENTATION

The index for a user or group is a MongoDB collection. The documents in the index are wave records with wave id, LMT, and any other metadata we want, plus an array of tokens. Each collection is indexed by tokens, using MongoDB's multikeys feature.

Code Block
$ mongo
MongoDB shell version: 1.8.2
connecting to: test
> db.INDEXsoren.find()
> db.INDEXsoren.ensureIndex({tokens:1})
> db.INDEXsoren.save({
    waveid:"example.com/wave1",
    lastmodifiedtime:1311170776,
    participants:["soren@example.com","john.smith@example.com"],
    snippet:"This is wave one",
    tokens:[
      "id:example.com/wave1",
      "creator:soren@example.com",
      "creator:soren",
      "is:read",
      "with:soren@example.com",
      "with:soren",
      "with:john.smith@example.com",
      "with:john.smith",
      "with:john",
      "with:smith",
      "this",
      "is",
      "wave",
      "one",
    ]})
> db.INDEXsoren.save({
    waveid:"example.com/wave2",
    lastmodifiedtime:1311170777,
    participants:["bob@example.com","soren@example.com"],
    snippet:"Wave two",
    tokens:[
      "id:example.com/wave2",
      "creator:bob@example.com",
      "creator:bob",
      "is:unread",
      "with:bob@example.com",
      "with:bob",
      "with:soren@example.com",
      "with:soren",
      "wave",
      "two",
    ]})
> db.INDEXsoren.find()
{ "_id" : ObjectId("4e26e2005dcfccaa5f94b81b"),
  "waveid" : "example.com/wave1",
  "lastmodifiedtime" : 1311170776,
  "participants" : [ "soren@example.com", "john.smith@example.com" ],
  "snippet" : "This is wave one",
  "tokens" : [ "id:example.com/wave1", ... ]
}
{ "_id" : ObjectId("4e26e4d15dcfccaa5f94b81c"),
  "waveid" : "example.com/wave2",
  "lastmodifiedtime" : 1311170777,
  "participants" : [ "bob@example.com", "soren@example.com" ],
  "snippet" : "Wave two",
  "tokens" : [ "id:example.com/wave2", ... ]
}
> db.INDEXsoren.find({tokens:"wave"})
{ "_id" : ObjectId("4e26e2005dcfccaa5f94b81b"), ... }
{ "_id" : ObjectId("4e26e4d15dcfccaa5f94b81c"), ... }
> db.INDEXsoren.find({tokens:"is:unread"})
{ "_id" : ObjectId("4e26e4d15dcfccaa5f94b81c"), ... }
> // a conjunctive query is constructed with {$all:[...]}
> db.INDEXsoren.find({tokens:{$all:["with:soren","wave"]}})
{ "_id" : ObjectId("4e26e2005dcfccaa5f94b81b"), ... }
{ "_id" : ObjectId("4e26e4d15dcfccaa5f94b81c"), ... }
> db.INDEXsoren.find({tokens:{$all:["with:soren","wave","one"]}})
{ "_id" : ObjectId("4e26e2005dcfccaa5f94b81b"), ... }

4.4 FILE BASED IMPLEMENTATION

The index for a user or group is a file. It consists of a short header with pointers to the sections of the index followed by the three sections:
Wave list section: Lists the waves in the index, sorted by LMT. Contains a record for each wave with its wave id and possibly some metadata about the wave. (If the record includes LMT, unread count, and a snippet, then a search result digest can be generated from this record without having to load the wave when this wave is found during search.)
Posting lists section:2 For each token there is a posting list with pointers to all the waves in the index with this token, sorted by LMT. The posting list begins with the token string itself. The wave pointers point into the wave list section. (Since that is also ordered by LMT, the pointers in the posting list are in increasing numerical order.) The posting lists themselves are sorted by a hash of the token.
Posting lists index section: A list of (token hash, posting list pointer) pairs, sorted by token hash. The posting list pointers point into the posting lists section.

The index can be used to search for waves as follows.
Suppose you have a query which some number of tokens, i.e., the query is for waves with all these tokens (an AND query).
You read the header of the index file to find where the three sections are and you read the posting lists index into memory.
You look up the posting lists for all the tokens in the query by the hash of the tokens. This can be done by binary search, because the posting lists index is sorted. For each posting list you get a pointer to the actual posting list in the posting lists section as well as its endpoint, namely the pointer to the successive posting list, which can also be read from the posting lists index.
Given their pointers and endpoints, you read the posting lists into memory (in parallel, in case that's efficient). Once the posting lists are in memory, you traverse the lists in parallel and perform an efficient intersection, exploiting that they are all ordered the same way by LMT, until you find however many search results you're looking for (a few dozen, typically, enough to display a screenful in the client search panel).
Lastly, you look up the results in the wave list section to get the wave ids and any metadata. If the metadata contains all the information needed to generate digests2 to display in the search results, you're done, otherwise you open the waves (loading them from disk if they're not already resident in memory) and generate the digests from the wave contents.

Index update1
TODO: describe how the wave server can combine the on-disk index with a cache of more recent changes to waves to answer search queries with up-to-date results ...

TODO: describe how to rewrite the on-disk index periodically and use the commit log to deal with data loss when the server is terminated abruptly and doesn't write recent changes to the on-disk indices before it terminates ...1

5. WAVE STORE COMMIT LOG

TODO: ...