Lens relies on Hive metastore to store it's model. That includes model definitions(cube,dimension,dimtable,facttable etc), and partitions for each storage table. Now at the time of processing a query which often has a time range clause, lens needs to find candidate tables that have time partitions for given time range. One storage table can have multiple update periods and a given time range can be satisfied by any weird combination of partitions of different ranges. Let's take an example:
Given Time range of query: [2015-01-29T06:00:00Z, 2015-03-01T06:00:00Z). Here Notation [a,b) means query range is inclusive of `a` and exclusive of `b`. We'll use this notation throughout.
This time range can be satisfied in a lot of ways. We're assuming only hourly,daily and monthly granularities of data are present:
|[2015-02, 2015-03)||[2015-01-30,2015-02-01)||[2015-01-29-06, 2015-01-30-00) ∪ [2015-03-01-00, 2015-03-01-06)|
All rollups are present
queried range's granularity is the finest present granularity for data
|[2015-02, 2015-03)||None||[2015-01-29-06, 2015-02-01-00) υ [2015-03-01-00, 2015-03-01-06)|
Would happen when let's say, daily rollups are not present
only hourly and monthly data is present
|None||[2015-01-30,2015-03-01)||[2015-01-29-06, 2015-01-30-00) υ [2015-03-01-00, 2015-03-01-06)||No monthly rollups present. Only daily and hourly data present|
|None||[2015-01-30,2015-02-10)υ[2015-02-11,2015-03-01)||[2015-01-29-06, 2015-01-30-00) υ [2015-02-10-00, 2015-02-11-00) υ [2015-03-01-00, 2015-03-01-06)||One day's daily data missing. Falling back to hourly|
So for every query that comes in, we need to find the facts that have partitions that can satisfy the query. We'll call such facts "candidate facts".
Let's further reduce the problem to determining whether a given fact is candidate for given time range and if yes, what are all the partitions of the fact that cover the time range. Implicit assumption here is that partitions that cover big time range are preferred.
We're given a fact and timerange. We have to look at fact's storage tables, storage tables' update periods and determine whether there's a set of <storage table, partitions> so that the whole time range is covered. And if there's one, we have to return the best such set. At first it seems like a harder variant of Set cover problem. Without going into the NP-hardness, we'll solve it recursively in the following way:
Find all update periods of the fact. All update periods of fact = Union of all update periods of all storage tables of the fact.
Now start from the biggest update period, see the maximum sub-range it can cover, cover that sub range using that update period and (if required) any of the update periods smaller than it. Now cover The remaining range(total range - the subrange recently covered) using update periods strictly smaller than the biggest one.
The crux of the whole procedure is the method `partition_exists`. We'd want to optimise that function as much as possible.
Easiest implementation that comes to mind is:
This approach queries hive meta store for each partition. This can blow up very fast. Let's say no partitions are there and update periods are YEARLY,MONTHLY,DAILY,HOURLY and MINUTELY. Now a particular query asks for time range of a whole year. Let's look at number of hive metastore calls made for processing this one query.
|Update Period||Number of calls to metastore||why?|
|YEARLY||1||YEARLY can cover the range so it will be asked to cover it|
|MONTHLY||12||previous update period couldn't cover. this is the next smaller update period and hence will be asked to cover|
|DAILY||365 (let's assume not a leap year)||same as above|
|HOURLY||365*24||same as above|
same as above
|TOTAL||534738||sum of all the above|
The problem becomes even worse because lens needs to accept a lot of queries and the bigger the query, the slower it will be processed. This way, crashing lens server or hive metastore becomes easy.
That's why we introduce the concept of Partition Timeline. Idea is that we don't need to query hive metastore for partition existence. We'll keep partitions stored in memory so that partition existence check can be answered from memory itself. Timeline would need a method for adding, deleting partition(s) to its internal data structure. We need some another capabilities: constructing this data structure from list of all partitions of the storage table. Storing this data structure in our model and loading this data structure from model. All our current model is stored as key value pair in properties of the hive table. So we require the capability of converting the data structure to and from key value pairs.
Based on that, the interface is like this:
Implementations of PartitionTimeline:
Now that we have the specifications finalized, We can easily go about implementing it in any way we choose. We can focus on minimizing access time, storage space etc.
This timeline stores all the partitions in a TreeSet. This way they are ordered. add just adds to the treeset, drop deletes from treeset, existence checks in the treeset. Latest partition is the last element of the treeset. Conversion to/from hashmap is just a comma separated list against some pre-decided key.
In this we store the first partition, last partition and all the missing in-between partitions. So if 1,2,3,4,9,10 are the registered partitions, We'll store first = 1, latest = 10, holes = 5,6,7,8
Here add either expands one of the boundaries: brings first down or takes latest up. Or it removes an element from holes.
Similarly delete, it either shrinks boundaries of adds hole.
existence first checks whether queried partition belongs in [first, latest]. If not, directly say partition doesn't exist. If yes, it exists if and only if it doesn't belong in the set of holes
latest is directly stored.
In Conversion you need three key value pairs for first,latest and holes each. holes is stored as comma separated string
In this we store ranges of existence. Taking an example, if 1,2,3,5,6,9,11,15,16 are the partitions registered we'll register [1,4) ∪ [5, 7) ∪ [9,10) ∪ [11,12) ∪ [15,17)
Equivalently, we'll store an Ordered List of tuples.
Here in each operation, we can utilize binary search.
add x is implemented as adding range [x,x+1). So we first binary search for where this needs to be inserted, Then insert the range. Now The incoming range might be such that it can be merged inside another tuple. So after inserting singleton range, we check with it's neighbours whether it can be merged in either or both. We accordingly merge the range then.
Delete Either shrinks one of the ranges or splits one range into two sub ranges.
Existence check first finds the range it should look at in the list of ranges. Then inside that range, checking for existence is O(1)
latest = ending partition of the last range
For conversion, you need one key value pair that stores comma separated ranges. Each range is again comma separated.
So far, we've given an idea of what Timeline is and why it's required. Now we'll explain how this plugs in to our system.
Integration With Timeline
Timeline instances are stored as a map for each storage table for each update period for each partition column.
Given above is the (not-so) high level code for loading a timeline. Given a fact name and storage name, lookup in the hashmap with a key like storage_fact, if present, return it, else load it from somewhere. For loading, first look in table properties for an indicator whether timeline is present in properties, if yes, load from properties, if no, get all partitions of the storage, compute timelines from there, store the resulting timeline to table properties and set the indicator to true.
This indicator design ensures that we can load the timelines again from all partitions whenever we want. We just have to set cube.storagetable.partition.timeline.cache.present=false in the table's properties.
Going deeper into the two types of loading:
By introducing the factory we ensure that every combination of <storageTableName,updatePeriod,partCol> can choose to have its own timeline class. So for example one such combination is expected to have a very small number of large spaced partitions, it can choose to have StoreAllPartitionTimeline.class as its timeline implementation. We need to set the property "cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class" to a valid class extending PartitionTimeline to use that implementation.
Note that the default timeline is EndsAndHoles. This is in line with the assumption that facts will usually have a large range of data present and there will only be a small number of time instances where data is absent.
The other type of loading is by all partitions:
Note that I have conveniently omitted corner cases in the interest of keeping the pseudo code simple.
Updation of timeline happens when new partitions are registered, or some partitions are dropped. For updating timeline, we have to update the in memory data structure and the table properties. In add partition(s), we first update in memory, then register the actual partition(s) and then update table properties. For deleting, we do the opposite, first delete the partition, then update the in memory data structure and then update table properties.
Server is the one that needs to use in memory timelines to do query translation. So it's very important to have timeline updated in server's in-memory data structure for lens server to behave correctly. So all partition registrations/deletions have to be done through REST API now.
Choosing a timeline implementation
So as already mentioned, Timeline implementation can be chosen for any <storagetable,updateperiod,partCol> combination. All you have to do is set "cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class" to a valid Timeline Implementation class in storage table's properties while timelines are still not created.
Migrating between timeline implementations
Currently, the only way of migrating between timeline implementations is to set "cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class" = new Timeline Implementation class and cube.storagetable.partition.timeline.cache.present=false in storage table properties. This would force lens server to load all timelines for that storage table from all partitions. Since hive table properties can only be altered, not deleted, properties of old timeline class will still be there but won't matter much. There's an open JIRA for supporting migration of a single timeline without forcing lens server to query for all partitions: LENS-489 - Support Timeline Migration Open .