Apache Kylin : Analytical Data Warehouse for Big Data
Welcome to Kylin Wiki.
Background
Why need a Global Dictionary?
In the field of OLAP data analysis, "count distinct" (or "deduplication") is a common requirement, and it has the "approximate count distinct" and "precise count distinct" according to the requirements for the deduplication results.
In large-scale data sets, it is still very challenging to achieve accurate deduplication and ensure quick query response. We know that the most frequently used processing method for precise deduplication is the bitmap method. For integer data, we can save these integer numbers into Bitmap directly, but except for integers, there are other data types, such as String. If we want to achieve accurate deduplication, we first need to build a dictionary for these data to do a unified mapping to integers, and then use the bitmap method for calculation.
We all know that Kylin uses pre-computation technology to accelerate big data analysis. In the incremental construction of the Cube, in order to avoid errors in the final deduplication results caused by the separate construction of dictionaries for different segments, all segments in a Cube will use the same dictionary (for one column), which is called the global dictionary.
Changes
Kylin has supported the global dictionary function since version 1.5.3, but the construction method at that time has obvious limitations:
- The global dictionary was constructed at a single point on the Job Server, and the construction time becomes uncontrollable as the data increases.
As the data accumulated, the construction time will require more and more memory. - Limited by the maximum number of integers.
In fact, the Hive-based distributed global dictionary construction has been added to Kylin 3.1. It has solved the above problems. However, in order to adapt to the new Spark engine, Kylin 4.0 implements another distributed method of building a global dictionary based on Spark. Today we will describe in detail how Kylin 4.0's global dictionary works.
Global Dictionary based on Spark
Kylin 4.0 builds a global dictionary based on Spark for distributed encoding processing, which reduces the pressure on a single machine node, and the number of built dictionaries can break the limit of the maximum number of integers.
Design
Structure
- Every build task will generate a new global dictionary;
- The dictionary of each new build task is saved according to the version number, and the old global dictionary will be gradually deleted;
- A dictionary includes a metadata file and multiple dictionary files, each dictionary file is called a bucket;
- Each bucket is divided into two mappings (Map<Object, Long>), and the two are combined into a complete mapping relationship;
BucketDictionary
Kylin introduced the concept of buckets, which can be understood as dividing the data into several buckets (ie, multiple partitions) for parallel processing when processing data. When the dictionary is built for the first time, the value in each bucket will be encoded starting from 1, and after the encoding of all buckets is completed, the overall dictionary value will be allocated according to the offset value of each bucket. In the code, the two encodings are stored through two HashMaps, one of which stores the relative dictionary value in the bucket, and the other stores the absolute dictionary value among all buckets.
The following figure shows the transfer of the dictionary in the bucket in multiple tasks for bucket 1, and each build creates a new version for the bucket (v1, v2, v3, etc.). Curr (current) and Prev (Previous) are two HashMaps in a bucket, which store the relative code value of the dictionary in the current bucket and the absolute code value of all the dictionary values that have been constructed before.
Steps
- Create a flat table through Spark and obtain the distinct values;
- Decide the number of shards/buckets according to the number of literal values, and determine whether to expand according to demand;
- Repartition data to multiple buckets, encode them separately, and store them in their own dictionary files;
- Assign a version number to the current build task;
- Save dictionary files and metadata data (number of buckets and offset value of buckets);
- According to the conditions, the old version needs to be deleted;
First Build
- Calculate the size of the bucket;
- Take the number of dictionaries that need to be built to handle the maximum value of the single bucket threshold and the default value of the number of buckets;
- Create buckets and allocate data for encoding;
- Generate the offsets of the metafile record bucket;
The following are related configuration items and their default values.
kylin.dictionary.globalV2-min-hash-partitions=10 kylin.dictionary.globalV2-threshold-bucket-size=500000
Non-First Build
- Determine whether the bucket needs to be expanded according to the number of dictionaries;
- The encoded dictionary value redistributes the expanded bucket;
- Read the latest version of the dictionary data before and distribute it to each bucket;
- Assign the new value to the bucket;
- The value of the dictionary built last time will not change;
Version Management
The global dictionary is isolated by assigning a timestamp-based version number to a single build. The reason for adding version control is that the build task may be executed concurrently. Through version control, the global dictionary built before can be completely read for every code, which ensures that the latest version of the dictionary has the most complete global dictionary code, and the global dictionary will be read the latest version of the dictionary. The dictionary is finally stored by version on the file storage system (HDFS here) as shown in the figure below.
FAQ
- Why do I need to use two Maps in a BucketDIctionary?
- At the beginning of the construction process, it is necessary to make a relative code for the dictionary assigned to each bucket starting from 1. The relative code value of this part of the dictionary will be stored in a HashMap. After the relative dictionary value coding is completed, each bucket will obtain the offset value, that is, the number of dictionaries in the bucket, then calculate the absolute code of the dictionary value in each bucket (the bucket is ordered), relative to the offset value of all buckets, and the absolute code of the dictionary will use another HashMap to store.
- Will there be data skew issues?
- Now with the test we made, the probability of the hotspot causing build failure is very small. Generally, it will not pass if it is tilted by one billion level. A lot of "count distinct" columns may indeed cause this problem, but the number of coded buckets can be infinitely enlarged. Unless a single key hotspot is required, adjusting the parameters is also a way to overcome it.
- Why can the value number in a global dictionary exceed the maximum integer (2^31, 2.1 billion)?
- Because we use the new Bitmap data structure "Roaring64BitMap", which is 64 bit (2^64); after the global dictionary encoding is completed, the encoding will be compressed into binary and stored in the Roaring64BitMap object. The bitmap is actually stored in Long instead of Integer.