This page explains how to use Hive to bulk load data into a new (empty) HBase table per HIVE-1295. (If you're not using a build which contains this functionality yet, you'll need to build from source and make sure this patch and HIVE-1321 are both applied.)
Ideally, bulk load from Hive into HBase would be part of HBaseIntegration, making it as simple as this:
CREATE TABLE new_hbase_table(rowkey string, x int, y int) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:x,cf:y"); SET hive.hbase.bulk=true; INSERT OVERWRITE TABLE new_hbase_table SELECT rowkey_expression, x, y FROM ...any_hive_query...; |
However, things aren't quite as straightforward as that yet. Instead, a procedure involving a series of SQL commands is required. It should still be a lot easier and more flexible than writing your own map/reduce program, and over time we hope to enhance Hive to move closer to the ideal.
The procedure is based on underlying HBase recommendations, and involves the following steps:
loadtable.rb
to move the files into a new HBase table.The rest of this page explains each step in greater detail.
Currently there are a number of constraints here:
Besides dealing with these constraints, probably the most important work here is deciding on how you want to assign an HBase row key to each row coming from Hive. To avoid inconsistencies between lexical and binary comparators, it is simplest to design a string row key and use it consistently all the way through. If you want to combine multiple columns into the key, use Hive's string concat expression for this purpose. You can use CREATE VIEW to tack on your rowkey logically without having to update any existing data in Hive.
TBD: provide some example numbers based on Facebook experiments; also reference Hadoop Terasort
You will need to add a couple jar files to your path. First, put them in DFS:
hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar |
Then add them to your hive-site.xml:
<property> <name>hive.aux.jars.path</name> <value>/user/hive/hbase-VERSION.jar,/user/hive/hive-hbase-handler-VERSION.jar</value> </property> |
In order to perform a parallel sort on the data, we need to range-partition it. The idea is to divide the space of row keys up into nearly equal-sized ranges, one per reducer which will be used in the parallel sort. The details will vary according to your source data, and you may need to run a number of exploratory Hive queries in order to come up with a good enough set of ranges. Here's one example:
add jar lib/hive-contrib-0.7.0.jar; set mapred.reduce.tasks=1; create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'; select transaction_id from (select transaction_id from transactions tablesample(bucket 1 out of 10000 on transaction_id) s order by transaction_id limit 10000000) x where (row_sequence() % 910000)=0 order by transaction_id limit 11; |
This works by ordering all of the rows in a .01% sample of the table (using a single reducer), and then selecting every nth row (here n=910000). The value of n is chosen by dividing the total number of rows in the sample by the desired number of ranges, e.g. 12 in this case (one more than the number of partitioning keys produced by the LIMIT clause). The assumption here is that the distribution in the sample matches the overall distribution in the table; if this is not the case, the resulting partition keys will lead to skew in the parallel sort.
Once you have your sampling query defined, the next step is to save its results to a properly formatted file which will be used in a subsequent step. To do this, run commands like the following:
create external table hb_range_keys(transaction_id_range_start string) row format serde 'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe' stored as inputformat 'org.apache.hadoop.mapred.TextInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat' location '/tmp/hb_range_keys'; insert overwrite table hb_range_keys select transaction_id from (select transaction_id from transactions tablesample(bucket 1 out of 10000 on transaction_id) s order by transaction_id limit 10000000) x where (row_sequence() % 910000)=0 order by transaction_id limit 11; |
The first command creates an external table defining the format of the file to be created; be sure to set the serde and inputformat/outputformat exactly as specified.
The second command populates it (using the sampling query previously defined). Usage of ORDER BY guarantees that a single file will be produced in directory /tmp/hb_range_keys
. The filename is unknown, but it is necessary to reference the file by name later, so run a command such as the following to copy it to a specific name:
dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list; |
The sort is going to produce a lot of data, so make sure you have sufficient space in your HDFS cluster, and choose the location where the files will be staged. We'll use /tmp/hbsort
in this example.
The directory does not actually need to exist (it will be automatically created in the next step), but if it does exist, it should be empty.
dfs -rmr /tmp/hbsort; dfs -mkdir /tmp/hbsort; |
Now comes the big step: running a sort over all of the data to be bulk loaded. Make sure that your Hive instance has the HBase jars available on its auxpath.
set hive.execution.engine=mr; set mapred.reduce.tasks=12; set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner; set total.order.partitioner.path=/tmp/hb_range_key_list; set hfile.compression=gz; create table hbsort(transaction_id string, user_name string, amount double, ...) stored as INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat' TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf'); insert overwrite table hbsort select transaction_id, user_name, amount, ... from transactions cluster by transaction_id; |
The CREATE TABLE creates a dummy table which controls how the output of the sort is written. Note that it uses HiveHFileOutputFormat
to do this, with the table property hfile.family.path
used to control the destination directory for the output. Again, be sure to set the inputformat/outputformat exactly as specified. In the example above, we select gzip (gz) compression for the result files; if you don't set the hfile.compression
parameter, no compression will be performed. (The other method available is lzo, which compresses less aggressively but does not require as much CPU power.)
Note that the number of reduce tasks is one more than the number of partitions - this must be true or else you will get a "Wrong number of partitions in keyset" error.
There is a parameter hbase.hregion.max.filesize
(default 256MB) which affects how HFiles are generated. If the amount of data (pre-compression) produced by a reducer exceeds this limit, more than one HFile will be generated for that reducer. This will lead to unbalanced region files. This will not cause any correctness problems, but if you want to get balanced region files, either use more reducers or set this parameter to a larger value. Note that when compression is enabled, you may see multiple files generated whose sizes are well below the limit; this is because the overflow check is done pre-compression.
The cf
in the path specifies the name of the column family which will be created in HBase, so the directory name you choose here is important. (Note that we're not actually using an HBase table here; HiveHFileOutputFormat
writes directly to files.)
The CLUSTER BY clause provides the keys to be used by the partitioner; be sure that it matches the range partitioning that you came up with in the earlier step.
The first column in the SELECT list is interpreted as the rowkey; subsequent columns become cell values (all in a single column family, so their column names are important).
Once the sort job completes successfully, one final step is required for importing the result files into HBase. Again, we don't know the name of the file, so we copy it over:
dfs -copyToLocal /tmp/hbsort/cf/* /tmp/hbout |
If Hive and HBase are running in different clusters, use distcp to copy the files from one to the other.
If you are using HBase 0.90.2 or newer, you can use the completebulkload utility to load the data into HBase
hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /tmp/hbout transactions |
In older versions of HBase, use the bin/loadtable.rb
script to import them:
hbase org.jruby.Main loadtable.rb transactions /tmp/hbout |
The first argument (transactions
) specifies the name of the new HBase table. For the second argument, pass the staging directory name, not the the column family child directory.
After this script finishes, you may need to wait a minute or two for the new table to be picked up by the HBase meta scanner. Use the hbase shell to verify that the new table was created correctly, and do some sanity queries to locate individual cells and make sure they can be found.
Finally, if you'd like to access the HBase table you just created via Hive:
CREATE EXTERNAL TABLE hbase_transactions(transaction_id string, user_name string, amount double, ...) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:user_name,cf:amount,...") TBLPROPERTIES("hbase.table.name" = "transactions"); |