Apache Kylin : Analytical Data Warehouse for Big Data

Page tree

Welcome to Kylin Wiki.

Part I   What is Hive Global Dictionary

Background

Count distinct(bitmap) measure is very important for many scenarios, such as PageView statistics, and Kylin support count distinct since 1.5.3 .
Apache Kylin implements precisely count distinct measure based on bitmap, and use Global Dictionary to encode string literal into integer.

Currently we have to build Global Dictionary in single process/JVM, which may take a lot of time and memory for UHC. By this feature, we use MR to build and use Hive to store Global Dictionary for Kylin.

This is the technical article for Hive Global Dictionary version2.

Benefit Summary

  1. Build Global Dictionary in distributed way, thus building job spent less time.
  2. Job Server will do less job, thus be more stable. 
  3. OneID, since the fact that Hive Global Dictionary is human-readable outside of Kylin, everyone can reuse this dictionary(Hive table) in the other scene across the company.

Release History

Release DateRelease versionJIRA issueComment
2019-10v3.0.0

KYLIN-3841 - Getting issue details... STATUS

Introduce Hive global dictionary.(first version)
2020-06v3.1.0

KYLIN-4342 - Getting issue details... STATUS

Use MapReduce other than HQL in some steps to improve performance.(second version)

Configuration

Conf keyExplanationExample
kylin.dictionary.mr-hive.databaseWhich database will the Hive Global Dictionary in.default
kylin.dictionary.mr-hive.columnsA list, contain all columns which need a Hive Global Dictionary, in a {TABLE_NAME}_{COLUMN_NAME} pattern.KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID
kylin.dictionary.mr-hive.table.suffixSuffix for Segment Dictionary Table and Global Dictionary Table_global_dict
kylin.dictionary.mr-hive.intermediate.table.suffixSuffix for Distinct Value Table_group_by
kylin.dictionary.mr-hive.columns.reduce.numA key/value structure(or a map), which key is {TABLE_NAME}_{COLUMN_NAME}, and value is number for expected reducers in Build Segment Level Dictionary (MR job Parallel Part Build).KYLIN_SALES_SALES_ID:3,KYLIN_SALES_BUYER_ID:2
kylin.source.hive.databasedir

The location of Hive table in HDFS.

In Kylin 3.1.1, this configuration were removed (see  KYLIN-4616 - Getting issue details... STATUS ).

/user/hive/warehouse/lacus.db
kylin.dictionary.mr-hive.ref.columnsTo reuse other global dictionary(s), you can specific a list here, to refer to some existent global dictionary(s) built by another cube.KYLIN_SALES_SALES_ID,KYLIN_SALES_BUYER_ID

Hive Table 

TableName PatternExplanation
Distinct Value Table${FLAT_TABLE}_${kylin.dictionary.mr-hive.intermediate.table.suffix}

This table is a temporary hive table for storing literal value which be extracted from flat table.

It contain one normal column, dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once).

This table also contain a partition column, its name is dict_column, means one partition for one column.

Please note, literal value which has been encoded previously will be removed.

Segment Dictionary Table${FLAT_TABLE}_${kylin.dictionary.mr-hive.table.suffix}

This table is a temporary hive table for storing literal value and its encoded integer which be extracted from flat table.

It contain two normal column: dict_key, that is all distinct literal value for each kylin.dictionary.mr-hive.columns(duplicated literal value are only remain once); the second column, dict_value, contains the encoded integer for corresponding literal value.

This table also contain a partition column, its name is dict_column, means one partition for one column.

Global Dictionary Table${CUBE_NAME}_${kylin.dictionary.mr-hive.table.suffix}This table is the Global Dictionary. It has the same schema as Segment Dictionary Table .

New added steps 

Compared to original global dictionary by Meituan.

Serial NoStep NameInputOutput
1Create hive dictionary tableN/AThree hive table
2Extract distinct value into Distinct Value TableFlat tableDistinct Value Table
3Build Segment Level Dictionary (Parallel Part Build)Distinct Value Table(File path is determined by kylin.source.hive.databasedir)Intermediate dict file(Literal value encoded in partition-level, so each reducer will encode literal from zero).
4Build Segment Level Dictionary (Parallel Total Build)Intermediate dict fileSegment Level Dictionary
5Merge Segment Level Dictionary into Global Dictionary TableSegment Level Dictionary and old Global Dictionary Table  New Global Dictionary Table
6Replace/encode Flat TableFlat tableNew flat table (but literal value will be replaced with encoded integer)
7Cleanup temp table & dataAll temporary hive tables

Nothing, they will be removed.

Screenshots(For new added steps)

MapReduce Job Diagram


HQL Analysis

Create hive dict table & Extract value into distinct value table
set mapreduce.job.name=Build Hive Global Dict - extract distinct value;
USE NightlyBuild;
set hive.exec.compress.output=false;
set hive.mapred.mode=unstrict;

-- create hive global dictionary table
CREATE TABLE IF NOT EXISTS NightlyBuild.ValidationCube_global_dict
( dict_key STRING COMMENT '', 
   dict_val INT COMMENT '' 
) 
COMMENT 'Hive Global Dictionary' 
PARTITIONED BY (dict_column string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE; 

-- create distinct value table
DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by; 
CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by 
( 
   dict_key STRING COMMENT '' 
) 
COMMENT '' 
PARTITIONED BY (dict_column string) 
STORED AS TEXTFILE 
;

-- create segment level dictionary table
DROP TABLE IF EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict; 
CREATE TABLE IF NOT EXISTS kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict 
( 
   dict_key STRING COMMENT '' , 
   dict_val STRING COMMENT '' 
) 
COMMENT '' 
PARTITIONED BY (dict_column string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE 
;

-- insert data into distinct value table
INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') 
SELECT a.DICT_KEY FROM (
  SELECT 
   USERACTIONLOGSAMPLE_PV_ID as DICT_KEY 
  FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f
  GROUP BY USERACTIONLOGSAMPLE_PV_ID) a 
    LEFT JOIN 
  (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict    
   WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PV_ID' ) b 
ON a.DICT_KEY = b.DICT_KEY 
WHERE b.DICT_KEY IS NULL 
;
INSERT OVERWRITE TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') 
SELECT a.DICT_KEY FROM (
  SELECT 
   USERACTIONLOGSAMPLE_PLAY_ID as DICT_KEY 
  FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f
  GROUP BY USERACTIONLOGSAMPLE_PLAY_ID) a 
    LEFT JOIN 
  (SELECT DICT_KEY FROM NightlyBuild.ValidationCube_global_dict    
   WHERE DICT_COLUMN = 'USERACTIONLOGSAMPLE_PLAY_ID' ) b 
ON a.DICT_KEY = b.DICT_KEY 
WHERE b.DICT_KEY IS NULL 
;

-- calculate max dict id 
INSERT OVERWRITE TABLE  kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by PARTITION (DICT_COLUMN = 'KYLIN_MAX_DISTINCT_COUNT') 
SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) 
FROM (
    SELECT dict_column, count(1) total_distinct_val
    FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f__group_by
    WHERE DICT_COLUMN != 'KYLIN_MAX_DISTINCT_COUNT'
    GROUP BY dict_column) tc 
LEFT JOIN (
    SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val 
    FROM NightlyBuild.ValidationCube_global_dict
    GROUP BY dict_column) tm 
ON tc.dict_column = tm.dict_column;


Merge into global dictionary
set mapreduce.job.name=Build Hive Global Dict - merge to dict table;
USE NightlyBuild;
set hive.mapred.mode=unstrict;

-- data is prepared in previous two MR jobs, create partition for segment level dictionary base on prepared files
ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PV_ID'); 
ALTER TABLE kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict ADD IF NOT EXISTS PARTITION (dict_column='USERACTIONLOGSAMPLE_PLAY_ID'); 


-- merge segment level dictionary into global dictionary
INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PV_ID') 
 SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict 
 WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' 
  UNION ALL 
 SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict 
 WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID' ;

INSERT OVERWRITE TABLE NightlyBuild.ValidationCube_global_dict 
PARTITION (dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') 
 SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict 
 WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' 
  UNION ALL 
 SELECT dict_key, dict_val FROM kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f_global_dict 
 WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID' ;


Replace/encode flat table
set mapreduce.job.name=Build Hive Global Dict - replace intermediate table;
USE NightlyBuild;
set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;

-- encode/replace flat table for specific column
INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f 
SELECT 
a.USERACTIONLOGSAMPLE_UID 
,a.USERACTIONLOGSAMPLE_ACT_TYPE 
,a.USERACTIONLOGSAMPLE_PAGE_ID 
,a.USERACTIONLOGSAMPLE_DEVICE_BRAND 
,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID 
,a.USERACTIONLOGSAMPLE_CITY 
,a.USERACTIONLOGSAMPLE_PART_DT 
,b.dict_val 
,a.USERACTIONLOGSAMPLE_PLAY_ID 
FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a 
LEFT OUTER JOIN 
 (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PV_ID') b 
ON a.USERACTIONLOGSAMPLE_PV_ID = b.dict_key;


INSERT OVERWRITE TABLE NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f 
SELECT 
a.USERACTIONLOGSAMPLE_UID 
,a.USERACTIONLOGSAMPLE_ACT_TYPE 
,a.USERACTIONLOGSAMPLE_PAGE_ID 
,a.USERACTIONLOGSAMPLE_DEVICE_BRAND 
,a.USERACTIONLOGSAMPLE_ITEM_TYPE_ID 
,a.USERACTIONLOGSAMPLE_CITY 
,a.USERACTIONLOGSAMPLE_PART_DT 
,a.USERACTIONLOGSAMPLE_PV_ID 
,b.dict_val 
FROM NightlyBuild.kylin_intermediate_validationcube_bb2091dc_82ba_f77f_86b9_6f83a1af639f a 
LEFT OUTER JOIN 
 (SELECT dict_key, dict_val FROM NightlyBuild.ValidationCube_global_dict WHERE dict_column = 'USERACTIONLOGSAMPLE_PLAY_ID') b 
ON a.USERACTIONLOGSAMPLE_PLAY_ID = b.dict_key;



Part II  How to use

How to configure

Step1. Create cube which contains COUNT_DISTINCT(bitmap) measure.

Step 2. Add properties in configuration overwrite step. 

Step 3. Build new segment. 



Part III  Performance

TODO

Comparison

Step Name

Duration EST

Data size

Create Intermediate Flat Hive Table



Build Hive Global Dict - extract distinct value



Redistribute Flat Hive Table



Build Hive Global Dict - parallel part build



Build Hive Global Dict - parallel total build



Build Hive Global Dict - merge to dict table



Build Hive Global Dict - replace intermediate table



Extract Fact Table Distinct Columns



Build Dimension Dictionary



Extract Dictionary from Global Dictionary(When shrunken dictionary enabled)



Build Base Cuboid



Total

Comment


Part IV Reference 

https://issues.apache.org/jira/browse/KYLIN-4342



  • No labels