Problem Statement: 

Lens relies on Hive metastore to store it's model. That includes model definitions(cube,dimension,dimtable,facttable etc), and partitions for each storage table. Now at the time of processing a query which often has a time range clause, lens needs to find candidate tables that have time partitions for given time range. One storage table can have multiple update periods and a given time range can be satisfied by any weird combination of partitions of different ranges. Let's take an example:


Given Time range of query: [2015-01-29T06:00:00Z, 2015-03-01T06:00:00Z). Here Notation [a,b) means query range is inclusive of `a` and exclusive of `b`. We'll use this notation throughout. 

This time range can be satisfied in a lot of ways. We're assuming only hourly,daily and monthly granularities of data are present:


[2015-02, 2015-03)[2015-01-30,2015-02-01)[2015-01-29-06, 2015-01-30-00) ∪ [2015-03-01-00, 2015-03-01-06)

Best case.

All rollups are present

NoneNone[2015-01-29-06, 2015-03-01-06)

Worst case.

queried range's granularity is the finest present granularity for data

[2015-02, 2015-03)None[2015-01-29-06, 2015-02-01-00) υ [2015-03-01-00, 2015-03-01-06)

Would happen when let's say, daily rollups are not present

only hourly and monthly data is present

None[2015-01-30,2015-03-01)[2015-01-29-06, 2015-01-30-00) υ [2015-03-01-00, 2015-03-01-06)No monthly rollups present. Only daily and hourly data present
None[2015-01-30,2015-02-10)υ[2015-02-11,2015-03-01)[2015-01-29-06, 2015-01-30-00) υ [2015-02-10-00, 2015-02-11-00) υ [2015-03-01-00, 2015-03-01-06)One day's daily data missing. Falling back to hourly


So for every query that comes in, we need to find the facts that have partitions that can satisfy the query. We'll call such facts "candidate facts". 

Let's further reduce the problem to determining whether a given fact is candidate for given time range and if yes, what are all the partitions of the fact that cover the time range. Implicit assumption here is that partitions that cover big time range are preferred. 


We're given a fact and timerange. We have to look at fact's storage tables, storage tables' update periods and determine whether there's a set of <storage table, partitions> so that the whole time range is covered. And if there's one, we have to return the best such set. At first it seems like a harder variant of Set cover problem. Without going into the NP-hardness, we'll solve it recursively in the following way:

Find all update periods of the fact. All update periods of fact = Union of all update periods of all storage tables of the fact. 

Now start from the biggest update period, see the maximum sub-range it can cover, cover that sub range using that update period and (if required) any of the update periods smaller than it. Now cover The remaining range(total range - the subrange recently covered) using update periods strictly smaller than the biggest one. 


Pseudo Code. Writing in python style for better readability
def cover(fact, range):
	update_periods = union of all update periods of all storage tables of fact
	update_periods.sort() # sort increasingly
	return cover(fact, range, update_periods) 
def cover(fact, range, update_periods):
	if range is empty:
		return []
    biggest_index = update_periods.length() - 1
	while update_periods[biggest_index] is larger than range: # e.g. range is of two days and update period is monthly
		if biggest_index < 0: 
			raise Exception("Uncoverable")
    sub_range = maximum_covering_subrange(range, update_periods[biggest_index])
	left_sub_range = [range.begin(), sub_range.begin)
	right_sub_range = [sub_range.end(), range.end)
	remaining_update_periods = update_periods[0:biggest_index) # end index exclusive
	# process sub_range here, recurse for left and right sub ranges
	sub_range_cover = []
	for part in sub_range:
		if partition_exists(fact, part):
			cover_using_smaller_update_periods = cover(fact, part, remaining_update_periods)
			if cover_using_smaller_update_periods is empty:
				raise Exception("Uncoverable")
			sub_range_cover += cover_using_smaller_update_periods # += means list concatenation
	return cover(fact, left_sub_range, remaining_update_periods) union sub_range_cover union cover(fact, left_sub_range, remaining_update_periods)


The crux of the whole procedure is the method `partition_exists`. We'd want to optimise that function as much as possible. 

Implementing partition_exists:


Easiest implementation that comes to mind is:

naive partition_exists
def partition_exists(fact, partition):
	for storage_table in all_storage_tables(fact):
		if hive_metastore.partition_exists(storage_table, partition.toString()):
			return true
	return false;


This approach queries hive meta store for each partition. This can blow up very fast. Let's say no partitions are there and update periods are YEARLY,MONTHLY,DAILY,HOURLY and MINUTELY. Now a particular query asks for time range of a whole year. Let's look at number of hive metastore calls made for processing this one query. 

Update PeriodNumber of calls to metastorewhy?
YEARLY1YEARLY can cover the range so it will be asked to cover it
MONTHLY12previous update period couldn't cover. this is the next smaller update period and hence will be asked to cover
DAILY365 (let's assume not a leap year)same as above
HOURLY365*24same as above

same as above

TOTAL534738sum of all the above


The problem becomes even worse because lens needs to accept a lot of queries and the bigger the query, the slower it will be processed. This way, crashing lens server or hive metastore becomes easy. 


That's why we introduce the concept of Partition Timeline. Idea is that we don't need to query hive metastore for partition existence. We'll keep partitions stored in memory so that partition existence check can be answered from memory itself. Timeline would need a method for adding, deleting partition(s) to its internal data structure. We need some another capabilities: constructing this data structure from list of all partitions of the storage table. Storing this data structure in our model and loading this data structure from model. All our current model is stored as key value pair in properties of the hive table. So we require the capability of converting the data structure to and from key value pairs. 


timeline supported partition_exists
def partition_exists(fact, partition):
	for storage_table in all_storage_tables(fact):
		if timeline.partition_exists(storage_table, partition.toString()):
			return true
	return false;


Based on that, the interface is like this:


Partition Timeline Interface
interface PartitionTimeline{
 * Add partition to timeline
boolean add(@NonNull TimePartition partition) throws LensException;

 * drop partition.
boolean drop(@NonNull TimePartition toDrop) throws LensException;

 * latest partition. will be null if no partitions exist.
 * @return
TimePartition latest();

 * serialize member objects as map
Map<String, String> toProperties();

 * deserialize member variables from given map
boolean initFromProperties(Map<String, String> properties) throws LensException;

 * Whether No partitions have been registered
boolean isEmpty();

 * whether timeline is in consistent state
boolean isConsistent();

 * Checks partition existence
boolean exists(TimePartition partition);


Implementations of PartitionTimeline:


Now that we have the specifications finalized, We can easily go about implementing it in any way we choose. We can focus on minimizing access time, storage space etc. 


This timeline stores all the partitions in a TreeSet. This way they are ordered. add just adds to the treeset, drop deletes from treeset, existence checks in the treeset. Latest partition is the last element of the treeset.  Conversion to/from hashmap is just a comma separated list against some pre-decided key. 


In this we store the first partition, last partition and all the missing in-between partitions. So if 1,2,3,4,9,10 are the registered partitions, We'll store first = 1, latest = 10, holes = 5,6,7,8

Here add either expands one of the boundaries: brings first down or takes latest up. Or it removes an element from holes.

Similarly delete, it either shrinks boundaries of adds hole. 

existence first checks whether queried partition belongs in [first, latest]. If not, directly say partition doesn't exist. If yes, it exists if and only if it doesn't belong in the set of holes

latest is directly stored. 

In Conversion you need three key value pairs for first,latest and holes each. holes is stored as comma separated string



In this we store ranges of existence. Taking an example, if 1,2,3,5,6,9,11,15,16 are the partitions registered we'll register [1,4) ∪ [5, 7) ∪ [9,10) ∪ [11,12) ∪ [15,17) 

Equivalently, we'll store an Ordered List of tuples. 

Here in each operation, we can utilize binary search. 

add x is implemented as adding range [x,x+1). So we first binary search for where this needs to be inserted, Then insert the range. Now The incoming range might be such that it can be merged inside another tuple. So after inserting singleton range, we check with it's neighbours whether it can be merged in either or both. We accordingly merge the range then.

Delete Either shrinks one of the ranges or splits one range into two sub ranges. 

Existence check first finds the range it should look at in the list of ranges. Then inside that range, checking for existence is O(1)

latest = ending partition of the last range

For conversion, you need one key value pair that stores comma separated ranges. Each range is again comma separated. 


So far, we've given an idea of what Timeline is and why it's required. Now we'll explain how this plugs in to our system.


Integration With Timeline

Timeline instances are stored as a map for each storage table for each update period for each partition column. 

Timeline cache
class PartitionTimelineCache extends CaseInsensitiveStringHashMap<// storage table
    CaseInsensitiveStringHashMap<// partition column
      PartitionTimeline>>> {
	public void get(String storage, String fact) {
		storageTableName = storage + "_" + fact;
		storageTable = Hive.get().getTable(storageTableName);
		if(get(storageTableName) == null) {
			// load
			if("true".equals(storageTable.getParameters.get("cube.storagetable.partition.timeline.cache.present"))) {
				loadTimelinesFromTableProperties(fact, storage);
			} else {
				loadTimelinesFromAllPartitions(fact, storage);
				// Also stores timeline to table properties and sets cube.storagetable.partition.timeline.cache.present=true in table properties.
		return get(storageTableName);
public class CaseInsensitiveStringHashMap<T> extends HashMap<String, T> {...}

Given above is the (not-so) high level code for loading a timeline. Given a fact name and storage name, lookup in the hashmap with a key like storage_fact, if present, return it, else load it from somewhere. For loading, first look in table properties for an indicator whether timeline is present in properties, if yes, load from properties, if no, get all partitions of the storage, compute timelines from there, store the resulting timeline to table properties and set the indicator to true. 

This indicator design ensures that we can load the timelines again from all partitions whenever we want. We just have to set cube.storagetable.partition.timeline.cache.present=false in the table's properties.

Going deeper into the two types of loading: 


def loadTimelinesFromTableProperties(fact, storage):
	storageTableName = storate + "_" + fact
	storageTable = getHiveTable(storageTableName)
	for updatePeriod in getFactUpdatePeriodMap(fact).get(storage):
		for partCol in allPartCols(storageTableName):
			ensureEntry(storageTableName, updatePeriod, partCol).init(storageTable.getParameters())
def ensureEntry(storageTableName, updatePeriod, partCol):
	if partitionTimelineCache.get(storageTableName) is null:
		partitionTimelineCache.put(storageTableName, new...)
	if partitionTimelineCache.get(storageTableName).get(updatePeriod) is null:
		partitionTimelineCache.get(storageTableName).put(updatePeriod, new ...)
	if partitionTimelineCache.get(storageTableName).get(updatePeriod).get(partCol) is null:
		partitionTimelineCache.get(storageTableName).get(updatePeriod).put(partCol, PartitionTimelineFactory.createFor(storageTableName, updatePeriod, partcol))
	return partitionTimelineCache.get(storageTableName).get(updatePeriod).get(partCol)



class PartitionTimelineFactory:
	def createFor(storageTableName, updatePeriod, partCol):
		className = getHiveTable(storageTableName).getParameters().get("cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class")
		if className is null: className = EndsAndHolesPartitionTimeline.class
		return getClass(className).newInstance(args)


By introducing the factory we ensure that every combination of <storageTableName,updatePeriod,partCol> can choose to have its own timeline class. So for example one such combination is expected to have a very small number of large spaced partitions, it can choose to have StoreAllPartitionTimeline.class as its timeline implementation. We need to set the property  "cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class" to a valid class extending PartitionTimeline to use that implementation. 

Note that the default timeline is EndsAndHoles. This is in line with the assumption that facts will usually have a large range of data present and there will only be a small number of time instances where data is absent. 


The other type of loading is by all partitions:

def loadTimelinesFromAllPartitions(fact, storage):
	storageTableName = storate + "_" + fact
	storageTable = getHiveTable(storageTableName)
	for partition in getAllPartitions(storageTAble):
		updatePeriod = partition.getProperties().get("cube.storagetable.partition.update.period")
		for key, value in partition.getTimePartSpec(): 
			ensureEntry(storageTableName, updatePeriod, key).add(TimePartition.of(updatePeriod, value))
	# Commit to properties
	for updatePeriod, timelinesForEachPartCol in partitionTimelineCache.get(storageTableName):
		for partCol, timeline in timelinesForEachPartCol:
	storageTable.getParameters.put("cube.storagetable.partition.timeline.cache.present", true)


Note that I have conveniently omitted corner cases in the interest of keeping the pseudo code simple. 


Updating Timelines

Updation of timeline happens when new partitions are registered, or some partitions are dropped. For updating timeline, we have to update the in memory data structure and the table properties. In add partition(s), we first update in memory, then register the actual partition(s) and then update table properties. For deleting, we do the opposite, first delete the partition, then update the in memory data structure and then update table properties. 

Server is the one that needs to use in memory timelines to do query translation. So it's very important to have timeline updated in server's in-memory data structure for lens server to behave correctly. So all partition registrations/deletions have to be done through REST API now. 


Choosing a timeline implementation

So as already mentioned, Timeline implementation can be chosen for any <storagetable,updateperiod,partCol> combination. All you have to do is set "cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class" to a valid Timeline Implementation class in storage table's properties while timelines are still not created. 


Migrating between timeline implementations

Currently, the only way of migrating between timeline implementations is to set "cube.storagetable.partition.timeline.cache."+updatePeriod+"."+partcol+".storage.class" = new Timeline Implementation class and cube.storagetable.partition.timeline.cache.present=false in storage table properties. This would force lens server to load all timelines for that storage table from all partitions. Since hive table properties can only be altered, not deleted, properties of old timeline class will still be there but won't matter much. There's an open JIRA for supporting migration of a single timeline without forcing lens server to query for all partitions:  LENS-489 - Getting issue details... STATUS .

  • No labels