I.Problem Statement:

Adding inverted indices to frequently used dimensions is very helpful to cut down query latency. Currently, whether or not applying an inverted index is purely the decision of system admin. A lot of manual work can go into this and there are several drawbacks:

  1. The problem of choosing a proper set of dimensions to apply inverted indices on can be very time consuming. A system admin has to pull up the logs and go through queries for most frequently used dimensions. Without established tool and methodology this can also be fault-prone.
  2. Furthermore, the approach in 1 does not guarantee inverted indices found are optimal or targeting the right queries (say, one wants to decrease the latency slow running queries with >90th% latency)
  3. There is an on average 100% storage penalty for adding inverted index to a specific column. And there is a non-zero cost1 of applying inverted index in the filtering phase, meaning there is a "sweet spot" for the number of inverted index.
  4. For a lot of use cases with and-connected predicates, it is most important to pick the one dimension with best selectivity, which has the best effect in cutting down the time spent in lookup phase. Over optimizing beyond this point will not be cost efficient.
  5. It is desirable for the system to swiftly give a recommendation upon changing of query pattern and data pattern.

II.Approach:

1.Latency Modeling:

Latency (execution time) exposed in broker log is comprised of:

  1. Queueing Time
  2. Lookup Time
  3. Aggregation Time
  4. Random Noise

1 and 4 are relatively hard to predict and irrelevant to our ultimate goal of inverted index recommendation, so we want to get rid of them from our query stats. By replaying real queries on test node with low QPS, the queueing time is minimized. Now, we want to correlate the entriesScannedInFilter with lookup time and entriesScannedPostFilter with aggregation time using a linear model, which can be expressed as:

$$Latency=\alpha \times entriesScannedInFilter + \beta \times  entriesScannedPostFilter+noise$$

We use minimum pooling to minimize noise. Which is essentially dividing the space of entriesScannedInFilter×entriesScannedPostFilter into small buckets of a×b, and for all the {(entriesScannedInFilter, entriesScannedPostFilter): ExecutionTime} falling into a given bucket, we select the one with minimum ExecutionTime.

The experiment on 6 use cases show good fitting results. We can establish entriesScannedInFilter is a good indicator of the latency contributed by lookup phase. With this established, we can analyze the potential of improvement we can make by calculate the contribution of entriesScannedInFilter to whole execution time. More importantly, this can function as  a good weight for our recommendation algorithm inn the following next section.

2.Parser-based Scoring:

From the discussion above, we can conclude that to recommend a proper inverted index based on data characteristics and query pattern, the following observations on different aspects of the problem are helpful:

  1.  From the aspect of all the queries, if a dimension is very frequently used across all queries, it is a good candidate for inverted index, because many queries can be optimized by applying only one inverted index.
  2.  From the perspective of data characteristics, it is desirable to add inverted index on columns with high selectivity. Because the evaluation of inverted index is always prioritized, picking a column with high selectivity for inverted index will often significantly reduce the numEntriesScannedInFilter.
  3. From the perspective of a single query, to better leverage the cardinality information, we should also consider the structure of the queries, namely the AND-connected and OR-connected clauses. Because for AND-connected clause, the scan range will be significantly reduced by applying inverted index on only one of the dimensions, whereas in OR-connected clauses it can only cut the number of entries scanned for that particular dimension.

With these observations, we design an algorithm based on the PQL parser generated by Antlr. For each clause, we can apply two functions, \(S(\cdot)\) and \(key(\cdot)\), which are defined recursively as following:

at top level, the set of columns in key will be extracted, whose corresponding vot318pxes will be increase by:

$$numEntriesScannedInFilter \times ( 1-\frac{1}{S}) $$

After processing all the queries, the columns will be ranked by their votes, generating a sequence of inverted index recommendation. 

3.Parser-based Estimation:

To solve the problem described in III.2 This part extends the above algorithm, with a more accurate estimation of work done in scanning. The high level idea of this algorithm is by defining a function \(L(p_0)\), we want to emulate the scanning behavior in the running-time, and try to estimated (1) the total numEntriesScannedInFilter during execution and (2) the numEntriesScannedInFilter saved by adding indices. For a complicated predicate, we want to pick the dimensions that will be most effective in terms of decreasing the numEntriesScannedInFilter.

A. The following assumptions are used in the description of the algorithm:

  1. \(L(p_0)\): For a predicate \(p_0\) on dimension \(D\), it calculates the (estimated) entries scanned during the evaluation of  \(p_0\). Noted that this estimation assumes a column-by-column fashion of scanning; thus, it can be an over-estimation especially for the AND connected predicates using zigzag scanning during evaluation. The recursive definition is:
    1. For a leaf predicate \(p_0\) on a single-valued dimension \(D\), \(L(p_0)=1\ column = 1\times totalRows\).
    2. For a leaf predicate \(p_0\) on a multi-valued dimension \(D\), \(L(p_0)=1\ column = AvgValuesPerDoc \times totalRows\).
    3. For a predicate \(p_0 \in "p_1\ AND\ p_2\ AND\ ...\ AND\ p_n"\), \(L(p_0)=L(p_1)+S(p_1)^{-1}L(p_2)+S(p_1)^{-1}S(p_2)^{-1}L(p_3)+...+\prod_{i=1 ... n-1}{S(p_i)^{-1}}L(p_n)\).
    4. For a predicate \(p_0 \in "p_1\ OR\ p_2\ OR\ ...\ OR\ p_n"\), \(L(p_0)=L(p_1)+L(p_2)+L(p_3)+...+L(p_n)\).
  2. Moreover, per the query execution logic of Pinot, if we add index on a column \(D\), then the \(L(p_0)\) of a leaf predicate \(p_0\) on \(D\) goes to 0.

B. Based on the discussion above, we can conclude for a given query "Select ... WHERE \(p_0\) ORDERED/GROUP by ...":

  1. When the top level predicate is a leaf predicate \(p_0\) on \(D\), adding inverted index on \(p_0\) will save the \(\Delta=L(p_0)-0=1\times totalRows\), so this query will vote \(\Delta\) for \(D\), denoted as \((D, \Delta)\).
  2. When the top level predicate \(p_0 \in "p_1\ OR\ p_2\ OR\ ...\ OR\ p_n"\), we will consider \(p_1 \cdots p_n\) separately, and each \(p_i\) will vote individually, as the \(L(p_0)\) is the simple sum of  \(L(p_1), ..., L(p_n)\). 
  3. When the top level predicate \(p_0 \in "p_1\ AND\ p_2\ AND\ ...\ AND\ p_n"\), we want to pick the most proper \(p_i\) so that after apply inverted index on it, the  \(L(p_0)\) will be minimized to achieve the maximum \(\Delta= L(p_0)- L_{min}(p_0)\)

Denote \(L_i:=L(p_i), S_i:=S(p_i)^{-1}\). Then we have \(L_1, L_2, L_3, ..., L_n\) and \(S_1, S_2, S_3, ..., S_n\). The third problem can be rewritten as: 

$$\underset{t}{\mathrm{argmin}}\{ L_{min} = S_t L_1 + S_t S_1 L_2 + S_t S_1 S_2 L_3 + \cdots + S_t \prod_{i=1 ... t-2}S_{i} L_{t-1} + \prod_{i=1 ... t} S_{i} L_{t+1} +  \prod_{i=1 ... t+1}S_{i} L_{t+2} + \cdots + \prod_{i=1 ... n-1}S_{i} L_{n} \}$$

The above evaluation can be done via: (1) brute force with  \(\prod_{i=1 ... t} S_{i} L_{t+1}\) cached ~ \(O(n^2)\), (2) a numerical estimation by calculating first \(m\) terms so that \(t_m<\epsilon\) ~ \(O(mn)\). The resulting nESI saved is \(\Delta = L(p_0) - L_{min}(p_0)\).

After the evaluation, the term \(p_t\) gets picked out and we want to add inverted index for each dimension appearing in it: \(\{D_{t1}, D_{t2}, ...\}\). So the query will vote: \( (\{D_{t1}, D_{t2}, ...\}, \Delta) \). To make this process more robust, if the \(\Delta_{i} = L(p_0) - L_{ithMin}(p_0)>\Delta\times g_{0}\) then we create a mutually exclusive vote  \( (DSet_{1th}=\{D_{t1}, D_{t2}, ...\}, \Delta) / (DSet_{2th}, \Delta_{2}) / \cdots / (DSet_{ith}, \Delta_{i})    \) meaning in the next step C, any one but only one part of the vote can be counted.

Moreover, if \(L_{min}(p_0) > C_0\) where \(C_0=1\times totalRows\) by default, meaning that adding index on one predicate is not sufficient, the above algorithm incrementally run on on the sequence of \(p_0' = p_1, \cdots, p_{t-1}, p_{t+1}, \cdots, p_n\), to recommend a new \(p_l\), the additional vote will then be  \((\{D_{l1}, D_{l2}, ...\}-\{D_{t1}, D_{t2}, ...\}, \Delta_l)\). This process can iterate until \(L_{min}(p_l)<C_0\). 

C. After the evaluation of all queries, we collect the vote from each query. If there are weights associated with queries, we can re-scale the \(\Delta\) for each vote. Then for all the votes, we want to select a set of dimensions with the highest overall \(\Delta\). This problem can be formulated as:  

Given \(D=\{d_1, d_2, d_3, ..., d_n\}\) is a set of boolean variables, and \(F=\{(I_1\wedge I_2\wedge \cdots I_j, W)| I_j \in D \}\) a set of weighted boolean formula. Assign \(K\) variables in \(D\) as True and others as False, maximize the sum of \(W\) in \(F\) whose corresponding \(I_1\wedge I_2\wedge \cdots I_j\) evaluates to True.

This problem is probably NP-Hard so we solve it by enumerating all possible assignments. Luckily we can use bitwise operation (sum+=a ^ mask = a ? W : 0) to quickly evaluate the boolean formula.

Given the size of query patterns for on-boarding use cases are small, we can determine the K in an adaptive manner: we can start from K=1 and increment 1 at a time. When the marginal gain \((W_k-W_{k+1})\) of K=k+1 is smaller than a portion \(G_{0}\) of total nESI without index, we will stop there.

III.Tool Design:

1.Query Source:

This tool looks for patterns in query text and execution details to give recommendations. This piece of information is exposed at broker level. Users can choose to write log files to local broker disk or publish log events to streaming platforms. Regardless of the data pipeline, following are necessary fields in the execution log for this tool to come up with a recommendation.

{

      "name": "query",

      "type": "string",

      "description": "The query text received, in Pinot Query Language.”

},

{

      "name": "queryProcessingDuration",

      "type": "long",

      "description": "Time taken to process the query, in milli-seconds”

},

{

      "name": "scannedEntriesInFilterCount",

      "type": "long",

      "description": "The number of entries that had to be scanned during the filter phase."

},

{

      "name": "scannedEntriesPostFilterCount",

      "type": "long",

      "description": "Number of entries scanned in the post-filter phase"

}

tuner.query.src.QuerySrc is a generic interface to accommodate for different input pipelines. On top of this, LogQuerySrcImpl is implemented to process lines in a log file, meanwhile calling BrokerLogParserImpl to generate IndexSuggestQueryStatsImpl objects wrapping the above fields. These IndexSuggestQueryStatsImpl objects will be feed to the analysis strategy.

2.Metadata Source:

To leverage the characteristic of data, this tool will access the metadata.properties and index_map file for each segment. Specifically, the following fields are useful:

{

      "name": "cardinality",

      "type": "long",

      "description": "The cardinality of a given segment, for a given table and a given column.”

},

{

      "name": "totalDocs",

      "type": "long",

      "description": "Total number of documents(rows) in a given segment, for a given table and a given column.”

},

{

      "name": "isSorted",

      "type": "long",

      "description": "If the data is sorted in a given segment, for a given table and a given column."

},

{

      "name": "totalNumberOfEntries",

      "type": "long",

      "description": "The total number of entries (values/multi-values) of a given segment, for a given table and a given column."

}

{

      "name": "invertedIndexSize",

      "type": "long",

      "description": "The size of inverted index in a given segment (if applicable), for a given table and a given column."

}

MetaManager is the generic interface for accessing the metadata. For now only the (weighted) average of cardinality, (weighted) average of entries per multi-value, total number of segments sorted, and inverted index size are used. The  implementation, JsonFileMetaManagerImpl is to fetch and provide these information from a file, as we have yet to store all these info in ZK or implement an API to access this information. An additional command CollectSegmentMetadata is provided to generate such packed metadata (in json format) from the tarred segments in controller. 

3.Segment Metadata Collector:

A tool is written to extract the relevant fields in tarred segments and pack them into a json file. The tool will work on the segment directory of controller which has a structure of segmentDirectory/tableNameWithoutType/segment. The metadata.properties and index_map files will be decompressed and parsed. The segment level properties are structured as /tableNameWithoutType/columnName/segmentName/property. And the sum of cardinality×totalDocs, totalDocs, isSortedtotalNumberOfEntries, invertedIndexSize, segmentCount will also be calculated and stored as column level metadata. This file will be used as described in 2.

4.Tool Structure

The tool is based on a driver which can provide high efficient multithreading capability. The tuning applications are build on top of this driver. This structure can be easily translate to Map-Reduce for scalable and realtime processing of query logs.

5.Usage

Get percentile report on numEntriesScannedInFilter:

$ ./pinot-admin.sh EntriesScannedQuantileReport -h
Usage: EntriesScannedQuantileReport
	-log                      <String>                      : Path to broker log file. (required=true)
	-tables                                                 : Comma separated list of table names to work on without type (unset run on all tables) (required=false)
	-help                                                   : Print this message. (required=false) 
$ ./pinot-admin.sh EntriesScannedQuantileReport -log ~/Documents/userTablesBrokerLog.log


Collect and pack metadata.properties in Json:

$ ./pinot-admin.sh CollectMetadataForIndexTuning -h
Usage: CollectMetadataForIndexTuning
	-out                      <String>                      : An empty directory to work on, for tmp files and output metadata.json file,must have r/w access (required=true)
	-segments                 <String>                      : The directory containing /tableNamesWithoutType/{tarred segments} (required=true)
	-tables                                                 : Comma separated list of table names to work on without type (unset to run on all tables) (required=false)
	-help                                                   : Print this message. (required=false)
$ ./pinot-admin.sh CollectMetadataForIndexTuning -out ~/tmpDirectory -segments /.../pinot/pinot_segments/pinot/ -tables userTable1


Get inverted index recommendation:

$ ./pinot-admin.sh IndexTuner -h
Usage: IndexTunerCommand
	-metadata                 <String>                      : Path to packed metadata file (json), CollectMetadataForIndexTuning can be used to create this. (required=true)
	-log                      <String>                      : Path to broker log file. (required=true)
	-index                    <inverted/sorted>             : Select target index. (required=true)
	-strategy                 <freq/parser>                 : Select tuning strategy. (required=true)
	-selectivityThreshold     <long>                        : Selectivity threshold (>1), default to 1,  (required=false)
	-entriesScannedThreshold  <long>                        : Log lines with numEntriesScannedInFilter below this threshold will be excluded. (required=false)
	-numQueriesThreshold      <long>                        : Tables with log lines scanned threshold will be excluded. (required=false)
	-tables                                                 : Comma separated list of table names to work on without type (unset run on all tables) (required=false)
	-untrackedInvertedIndex                                 : "{\"tabelNameWithoutType1\": [\"colName1\",\"colName2\"]}" (required=false)
	-help                                                   : Print this message. (required=false)
$ ./pinot-admin.sh IndexTuner -index inverted -strategy parser -log ~/Documents/userTablesBrokerLog.log -metadata ~/tmpDirectory/metadata.json -untrackedInvertedIndex "{\"userTable1\":[\"untrackedColumn1\"]}" -selectivityThreshold 10 -tables userTable1

III.Future Improvement:

1. Adding desirableNumEntriesScannedInFilter:

To complete the picture of how many inverted index we should add, Sunitha Beeram  suggests that we can use a threshold desirableNumEntriesScannedInFilter, with which we can estimate by adding how many inverted index can we lower the actual numEntriesScannedInFilter to the desirableNumEntriesScannedInFilter. We could also potentially use a feed-back controlled incremental inverted index application with this threshold. Such process will make the model strongly coherent with SLA.

2.Known Issue with numEntriesScannedInFilter cut-down estimation for large multi-value columns.

Right now in the model, we are using:

$$numEntriesScannedInFilter \times ( 1-\frac{1}{S}) $$

to estimate the very part of number of entries scanned in filter we can cut down by adding an inverted index. There is actually a known bias in this estimation. A more accurate form will be:

$$\underbrace{(nESI_{total}-nESI_{col})\times ( 1-\frac{1}{S_{col}})}_{\text{Range Shrink}}+\underbrace{nESI_{col}}_{\text{Direct Hit}}$$

However the number of entries scanned in filter is not exposed in column granularity right now, so this cannot be built. The impact of this for MV columns is already mitigated by some level by not adding the average number of entries per doc into the denominator of S. But it will be ideal if we can expose this and make accurate prediction. And this should be  doable without too much overhead. Thanks to Mayank Shrivastava for bringing up this in my presentation.


IV.Appendix:

1.Compression Factor: Study of the Size of Roaring BitMap

Repo: https://git.io/fjAgZ


  1.  It can potentially hurt the performance if we use inverted index for very long range query, where a lot of bitmaps are or-ed.
  • No labels

4 Comments

  1. This is awesome Jia Guo. I liked the systematic approach taken to deduce some of the conclusions. Few questions/suggestions

    •  what are key and S representing. 
    • Is it possible to come up with how much would the latency improve overall? approximate is fine. That will help in deciding if the additional storage is worth it. 
    • Is it possible to extend this to say when star-tree will be useful? Think about optimizing entriesScannedPostFilter instead of entriedScannedInFilter.


    1. Hello Kishore,

      1. S is representing Selectivity, basically (S-1)/S is the portion of range of docs we can shrink by adding inverted index on a column.

      2. It is possible and it is already in EntriesScannedQuantileReport. But because we are not exposing the pure execution/lookup time, this estimation will mostly not be made because the model cannot fit queueing/noise. After we expose that filtering time metric it will be possible.

      3. I haven't thought much about star-tree right now, but it is a very promising future work.

      Thanks!

  2. Can we use enhance this to suggest inverted indices purely based on the cardinality of each column without looking at the logs?. This can be something user can run on a sample data. We can assume that all columns are queried equally or take the columns frequently queried as inputs.

    1. Without looking at the logs, we can base the method on:

      1. Apply inverted index on high selectivity columns.
      2. Apply inverted index on multi-value columns with large number of entries per doc.

      And the things we can leverage are:

      1. The characteristics of data, which include the current fields in metadata. But looking at the actual data distribution may have privacy related issue.
      2. The semantic meaning of the column name, for ex xxx-id can be a good candidate.

      But again this method can be inaccurate without the knowledge of log.