Apache Kylin : Analytical Data Warehouse for Big Data
Welcome to Kylin Wiki.
Part I What is Hive Global Dictionary
Background
Count distinct(bitmap) measure is very important for many scenarios, such as PageView statistics, and Kylin support count distinct since 1.5.3 .
Apache Kylin implements precisely count distinct measure based on bitmap, and use Global Dictionary to encode string literal into integer.
Currently we have to build Global Dictionary in single process/JVM, which may take a lot of time and memory for UHC. By this feature, we use MR to build and use Hive to store Global Dictionary for Kylin.
This is the technical article for Hive Global Dictionary version2.
Benefit Summary
- Build Global Dictionary in distributed way, thus building job spent less time.
- Job Server will do less job, thus be more stable.
- OneID, since the fact that Hive Global Dictionary is human-readable outside of Kylin, everyone can reuse this dictionary(Hive table) in the other scene across the company.
Release History
Release Date | Release version | JIRA issue | Comment |
---|---|---|---|
2019-10 | v3.0.0 | Introduce Hive global dictionary.(first version) | |
2020-06 | v3.1.0 | Use MapReduce other than HQL in some steps to improve performance.(second version) |
Configuration
Conf key | Explanation | Example |
---|---|---|
kylin.dictionary.mr-hive.database | Which database will the Hive Global Dictionary in. | default |
kylin.dictionary.mr-hive.columns | A list, contain all columns which need a Hive Global Dictionary, in a {TABLE_NAME}_{COLUMN_NAME} pattern. | KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID |
kylin.dictionary.mr-hive.table.suffix | Suffix for Segment Dictionary Table and Global Dictionary Table | _global_dict |
kylin.dictionary.mr-hive.intermediate.table.suffix | Suffix for Distinct Value Table | _group_by |
kylin.dictionary.mr-hive.columns.reduce.num | A key/value structure(or a map), which key is {TABLE_NAME}_{COLUMN_NAME}, and value is number for expected reducers in Build Segment Level Dictionary (MR job Parallel Part Build). | KYLIN_SALES_SALES_ID:3,KYLIN_SALES_BUYER_ID:2 |
kylin.source.hive.databasedir | /user/hive/warehouse/lacus.db | |
kylin.dictionary.mr-hive.ref.columns | To reuse other global dictionary(s), you can specific a list here, to refer to some existent global dictionary(s) built by another cube. | KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID |
Hive Table
Table | Name Pattern | Explanation |
---|---|---|
Distinct Value Table | ${FLAT_TABLE}_${kylin.dictionary.mr-hive.intermediate.table.suffix} | This table is a temporary hive table for storing literal value which be extracted from flat table. It contain one normal column, dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once). This table also contain a partition column, its name is dict_column, means one partition for one column. Please note, literal value which has been encoded previously will be removed. |
Segment Dictionary Table | ${FLAT_TABLE}_${kylin.dictionary.mr-hive.table.suffix} | This table is a temporary hive table for storing literal value and its encoded integer which be extracted from flat table. It contain two normal column: dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once); the second column, dict_value, contains the encoded integer for corresponding literal value. This table also contain a partition column, its name is dict_column, means one partition for one column. |
Global Dictionary Table | ${CUBE_NAME}_${kylin.dictionary.mr-hive.table.suffix} | This table is the Global Dictionary. It has the same schema as Segment Dictionary Table . |
New added steps
Compared to original global dictionary by Meituan.
Serial No | Step Name | Input | Output |
---|---|---|---|
1 | Create hive dictionary table | N/A | Three hive table |
2 | Extract distinct value into Distinct Value Table | Flat table | Distinct Value Table |
3 | Build Segment Level Dictionary (Parallel Part Build) | Distinct Value Table(File path is determined by kylin.source.hive.databasedir) | Intermediate dict file(Literal value encoded in partition-level, so each reducer will encode literal from zero). |
4 | Build Segment Level Dictionary (Parallel Total Build) | Intermediate dict file | Segment Level Dictionary |
5 | Merge Segment Level Dictionary into Global Dictionary Table | Segment Level Dictionary and old Global Dictionary Table | New Global Dictionary Table |
6 | Replace/encode Flat Table | Flat table | New flat table (but literal value will be replaced with encoded integer) |
7 | Cleanup temp table & data | All temporary hive tables | Nothing, they will be removed. |
Screenshots(For new added steps)
MapReduce Job Diagram
HQL Analysis
set mapreduce.job.name=Build Hive Global Dict - extract distinct value; USE NightlyBuild; set hive.exec.compress.output=false; set hive.mapred.mode=unstrict; -- create hive global dictionary table CREATE TABLE IF NOT EXISTS NightlyBuild.ValidationCube_global_dict ( dict_key STRING COMMENT '', dict_val INT COMMENT '' ) COMMENT 'Hive Global Dictionary' PARTITIONED BY (dict_column string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; -- create distinct value table DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by; CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by ( dict_key STRING COMMENT '' ) COMMENT '' PARTITIONED BY (dict_column string) STORED AS TEXTFILE ; -- create segment level dictionary table DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict; CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ( dict_key STRING COMMENT '' , dict_val STRING COMMENT '' ) COMMENT '' PARTITIONED BY (dict_column string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE ; -- insert data into distinct value table INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') SELECT a.DICT_KEY FROM ( SELECT USERACTIONLOGSAMPLE_PV_ID as DICT_KEY FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f GROUP BY USERACTIONLOGSAMPLE_PV_ID) a LEFT JOIN (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PV_ID' ) b ON a.DICT_KEY = b.DICT_KEY WHERE b.DICT_KEY IS NULL ; INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') SELECT a.DICT_KEY FROM ( SELECT USERACTIONLOGSAMPLE_PLAY_ID as DICT_KEY FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f GROUP BY USERACTIONLOGSAMPLE_PLAY_ID) a LEFT JOIN (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PLAY_ID' ) b ON a.DICT_KEY = b.DICT_KEY WHERE b.DICT_KEY IS NULL ; -- calculate max dict id INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (DICT_COLUMN = 'KYLIN_MAX_DISTINCT_COUNT') SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) FROM ( SELECT dict_column, count(1) total_distinct_val FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by WHERE DICT_COLUMN != 'KYLIN_MAX_DISTINCT_COUNT' GROUP BY dict_column) tc LEFT JOIN ( SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM NightlyBuild.ValidationCube_global_dict GROUP BY dict_column) tm ON tc.dict_column = tm.dict_column;
set mapreduce.job.name=Build Hive Global Dict - merge to dict table; USE NightlyBuild; set hive.mapred.mode=unstrict; -- data is prepared in previous two MR jobs, create partition for segment level dictionary base on prepared files ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PV_ID'); ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PLAY_ID'); -- merge segment level dictionary into global dictionary INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' UNION ALL SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' ; INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' UNION ALL SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' ;
set mapreduce.job.name=Build Hive Global Dict - replace intermediate table; USE NightlyBuild; set hive.exec.compress.output=false; set hive.mapred.mode=unstrict; -- encode/replace flat table for specific column INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f SELECT a.USERACTIONLOGSAMPLE_UID ,a.USERACTIONLOGSAMPLE_ACT_TYPE ,a.USERACTIONLOGSAMPLE_PAGE_ID ,a.USERACTIONLOGSAMPLE_DEVICE_BRAND ,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID ,a.USERACTIONLOGSAMPLE_CITY ,a.USERACTIONLOGSAMPLE_PART_DT ,b.dict_val ,a.USERACTIONLOGSAMPLE_PLAY_ID FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a LEFT OUTER JOIN (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID') b ON a.USERACTIONLOGSAMPLE_PV_ID = b.dict_key; INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f SELECT a.USERACTIONLOGSAMPLE_UID ,a.USERACTIONLOGSAMPLE_ACT_TYPE ,a.USERACTIONLOGSAMPLE_PAGE_ID ,a.USERACTIONLOGSAMPLE_DEVICE_BRAND ,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID ,a.USERACTIONLOGSAMPLE_CITY ,a.USERACTIONLOGSAMPLE_PART_DT ,a.USERACTIONLOGSAMPLE_PV_ID ,b.dict_val FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a LEFT OUTER JOIN (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') b ON a.USERACTIONLOGSAMPLE_PLAY_ID = b.dict_key;
Part II How to use
How to configure
Step1. Create cube which contains COUNT_DISTINCT(bitmap) measure.
Step 2. Add properties in configuration overwrite step.
Step 3. Build new segment.
Part III Performance
TODO
Comparison
Step Name | Duration EST | Data size |
---|---|---|
Create Intermediate Flat Hive Table | ||
Build Hive Global Dict - extract distinct value | ||
Redistribute Flat Hive Table | ||
Build Hive Global Dict - parallel part build | ||
Build Hive Global Dict - parallel total build | ||
Build Hive Global Dict - merge to dict table | ||
Build Hive Global Dict - replace intermediate table | ||
Extract Fact Table Distinct Columns | ||
Build Dimension Dictionary | ||
Extract Dictionary from Global Dictionary(When shrunken dictionary enabled) | ||
Build Base Cuboid | ||
Total | ||
Comment |
Part IV Reference
https://issues.apache.org/jira/browse/KYLIN-4342