...
The only additional Accumulo configuration necessary is the inclusion of the hive-accumulo-storage-handler.jar
, provided as a part of the Hive distribution, to be included in the Accumulo server classpath. This can be accomplished a variety of ways: copying/symlink the jar into $ACCUMULO_HOME/lib
or $ACCUMULO_HOME/lib/ext
or include the path to the jar in general.classpaths
in accumulo-site.xml. Be sure to restart the Accumulo tabletservers if the jar is added to the classpath in a non-dynamic fashion (using $ACCUMULO_HOME/lib
or general.classpaths
in accumulo-site.xml).
...
To issue queries against Accumulo using Hive, four parameters must be provided by the Hive configuration:
Connection Parameters |
---|
accumulo.instance.name |
accumulo.zookeepers |
accumulo.user.name |
accumulo.user.pass |
For those familiar with Accumulo, these four configurations are the normal configuration values necessary to connect to Accumulo: the Accumulo instance name, the ZooKeeper quorum (comma-separated list of hosts), and Accumulo username and password. The easiest way to provide these values is by using the -hiveconf
option to the hive
command. It is expected that the Accumulo user provided either has the ability to create new tables, or that the Hive queries will only be accessing existing Accumulo tables.
...
To access Accumulo tables, a Hive table must be created using the CREATE
command with the STORED BY
clause. If the EXTERNAL
keyword is omitted from the CREATE
call, the lifecycle of the Accumulo table is tied to the lifetime of the Hive table: if the Hive table is deleted, so is the Accumulo table. This is the default case. Providing the EXTERNAL
keyword will create a Hive table that references an Accumulo table but will not remove the underlying Accumulo table if the Hive table is dropped.
Each Hive row maps to a set of Accumulo keys with the same row ID. One column in the Hive row is designated as a "special" column which is used as the Accumulo row ID. All other Hive columns in the row have some mapping to Accumulo column (column family and qualifier) where the Hive column value is placed in the Accumulo value.
No Format |
---|
CREATE TABLE accumulo_table(rowrowid STRING, name STRING, age INT, weight DOUBLE, height INT) STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler' WITH SERDEPROPERTIES('accumulo.columns.mapping' = ':rowid,person:name,person:age,person:weight,person:height'); |
In the above statement, normal Hive column name and type pairs are provided as is the case with normal create table statements. The full AccumuloStorageHandler class name is provided to inform Hive that Accumulo will back this Hive table. A number of properties can be provided to configure the AccumuloStorageHandler via SERDEPROPERTIES or TBLPROPERTIES. The most important property is "accumulo.columns.mapping" which controls how the Hive columns map to Accumulo columns. In this case, the "row" Hive column is used to populate the Accumulo row ID component of the Accumulo Key, while the other Hive columns (name, age, weight and height) are all columns within the Accumulo row.
For the above schema in the "accumulo_table", we could envision a single row in the table:
No Format |
---|
hive> select * from accumulo_table;
row1 Steve 32 200 72 |
The above record would be serialized into Accumulo Key-Value pairs in the following manner given the declared accumulo.columns.mapping:
No Format |
---|
user@accumulo accumulo_table> scan
row1 person:age [] 32
row1 person:height [] 72
row1 person:name [] Steve
row1 person:weight [] 200 |
The power of the column mapping is that multiple Hive tables with differing column mappings can interact with the same Accumulo table and produce different results. When columns are excluded, the performance of Hive queries can be improved through the use of Accumulo locality groups to filter out unwanted data at the server-side.
Column Mapping
The column mapping string is comma-separated list of encoded values whose offset corresponds to the Hive schema for the table. The order of the columns in the Hive schema can be arbitrary as long as the elements in the column mapping align to the intended Hive column. For those familiar with Accumulo, each element in the column mapping string resembles a column_family:column_qualifier; however, there are a few different variants that allow for different control.
...
These are set by including a pound sign ('#') after the column mapping element with either the long or short serialization value. The default serialization is 'string'. For example, for the value 10, "person:age#s" is synonymous with the "person:age" and would serialize the value as the literal string "10". If "person:age#b" was used instead, the value would be serialized as four bytes: \x00\x00\x00\xA0.
Indexing
Starting in Hive 3.0.0 with HIVE-15795, indexing support has been added to Accumulo-backed Hive tables. Indexing works by using another Accumulo table to store the field value mapping to rowId of the data table. The index table is automatically populated on record insertion via Hive.
Using index tables greatly improve performance of non-rowId predicate queries by eliminating full table scans. Indexing works for both internally and externally managed tables using either the Tez or Map Reduce query engines. The following options control indexing behavior.
Option Name | Description |
---|---|
accumulo.indextable.name | (Required) The name of the index table in Accumulo. |
accumulo.indexed.columns | (Optional) A comma separated list of hive columns to index, or * which indexes all columns (default: *) |
accumulo.index.rows.max | (Optional) The maximum number of predicate values to scan from the index for each search predicate (default: 20000) See this note about this value |
accumulo.index.scanner | (Optional) The index scanner implementation. (default: org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner) |
The indexes are stored in the index table using the following format:
rowId = [field value in data table]
column_family = [field column family in data table] + ‘_’ + [field column quantifier in data table]
column_quantifier = [field rowId in data table]
visibility = [field visibility in data table]
value = [empty byte array]
When using a string encoded table, the indexed field value is encoded using Accumulo Lexicoder methods for numeric types. Otherwise, values are encoding using native binary encoding. This information will allow applications to insert data and index values into Accumulo outside of Hive but still require high performance queries from within Hive.
It is important to note when inserting data and indexes outside of Hive it is important to update both tables within the same unit of work. If the Hive query does not find indexes matches for the any of the query predicates, the query will short circuit and return empty results without searching the data table.
If the search predicate matches more entries than defined by the option Anchor index_rows_max index_rows_max accumulo.index.rows.max
(default 20000), the index search results will be abandoned and the query will fall back to a full scan of the data table with predicate filtering. Remember using large values for this option or having very large data table rowId values may require increasing hive memory to prevent memory errors.
Other options
The following options are also valid to be used with SERDEPROPERTIES or TABLEPROPERTIES for further control over the actions of the AccumuloStorageHandler:
...
No Format |
---|
CREATE TABLE hive_map(key int, value map<string,int>) STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler' WITH SERDEPROPERTIES ( "accumulo.columns.mapping" = ":rowID,cf:*", "accumulo.default.storage" = "binary" ); |
...
No Format |
---|
CREATE EXTERNAL TABLE countries(key string, name string, country string, country_id int)
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES ("accumulo.columns.mapping" = ":rowID,info:name,info:country,info:country_id"); |
Create an indexed table
To take advantage of indexing, Hive uses another Accumulo table is used to create a lexicographically-sorted search term index for each field allowing for very efficient exact match and bounded range searches.
No Format |
---|
CREATE TABLE company_stats ( rowid string, active_entry boolean, num_offices tinyint, num_personel smallint, total_manhours int, num_shareholders bigint, eff_rating float, err_rating double, yearly_production decimal, start_date date, address varchar(100), phone char(13), last_update timestamp ) ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe' STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler' WITH SERDEPROPERTIES ( "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu”, "accumulo.table.name"="company_stats", "accumulo.indextable.name"="company_stats_idx" ); |
Acknowledgements
I would be remiss to not mention the efforts made by Brian Femiano that were the basis for this storage handler. His initial prototype for Accumulo-Hive integration was the base for this work.