View Source

HCatalog HBase Integration Design

Author: Alan Gates

This document discusses issues that need to be dealt with in order to integrate HBase and HCatalog.

Overview

The general approach is to allow HBase to be a storage device for HCatalog tables, just as HDFS files can now be. This will expand the notion of HCatalog tables. Also, places in HCatalog that depend on HDFS, e.g. security, will need to be changed to work with different storage types.

Changes to Metadata

Hive already has a way to connect to HBase, via its HiveStorageHandler interface. While this does not support everything we want it is a good first step. It allows Hive to front an HBase table. For full details see Hive's documentation on its storage handlers and HBase integration. The following issues exist that we will need to resolve as part of this effort:

  • The HiveMetaHook currently only supports creating and dropping tables through the storage handler. We will want to expand this to include alter in order to support changes to these tables (such as adding column families).
  • The HiveStorageHandler implementation currently is invoked by the client, not the metastore server. TODO It is not clear how this will play with security. It will also make deployment more onerous as we will have to include the handler as part of the client install. In the short term we will most likely not be able to resolve this. In the long term we should work with the Hive team to move the interaction with the interface into the server. In fact, all interaction with storage (be it HDFS, HBase, or other) should be through this interface.
  • We had proposed a data model where a column family in HBase is a column in HCatalog. Hive's HBaseStorageHandler allows this, but also allows explicit mapping of columns inside an HBase column family to a column in Hive. We need to resolve whether we want to allow this or restrict it to only HBase column family = HCatalog column.
  • As far as I know, this work is not integrated with Hadoop, HBase, or Hive security, all of which is different. We will likely need to change the HiveStorageHandler interface to interact with security.
  • HBaseStorageHandler currently only allows access to the latest revision of a column. For Pig and MapReduce (who would not be actually reading the data through this interface) this should not be an issue. For Hive, it is not clear if we would use this interface to access the data or HCatalog's StorageDriver interface.
  • Hive notes that it does not support compound keys in HBase. As far as I can tell HBase keys are always just byte arrays. So I assume what this means is that the key has to map to a single column in Hive. This limitation should be acceptable initially, but we may need to expand the functionality in the future.

In addition to changes to the HiveStorageHandler interface we will need to add a table to the metastore. This table will store the latest complete revision for each table. It will also record a list of pending revisions. In the future we may expand this to record abandoned revisions.

Interface Changes

We could attempt to build a whole new set of interfaces shadowing the HBase APIs, and thus reuse this code to connect to Cassandra or similar systems. However, this would require constructing a lot of classes when we have only one implementation for the foreseeable future. And I suspect that the interfaces amongst these stores are so different that one offs are a better approach. Based on this, rather than generically extend the storage drivers, input/output formats, and load and store functions we will instead make an HBase specific versions of the storage driver, and only slightly change the existing input and output formats and load and store functions. This will allow us to directly use HBase classes rather than creates tens of shadow classes to provide the same functionality.

Changes for Input

HCatInputStorageDriver.java
import org.apache.hadoop.hbase.client.Scan;

package org.apache.hcatalog.hbase;

/**
* An extension of {@link HCatInputStorageDriver} explicitly for HBase. In
* addition to implementing the methods of HCatInputStorageDriver it adds the
* ability to provide a Scan that defines what records will be read from HBase
* for this job. If no Scan is provided it will select the latest revision
* from all column families (most likely not what you want).
*/
public class HBaseInputStorageDriver extends HCatInputStorageDriver {

private scan = null;
private ResultConverter converter;

@Override
public void initialize(JobContext context,
Properties storageDriverArgs) throws IOException {
// set up converter
}

@Override
/**
* Will return an instance of TableInputFormat. If scan properties have
* been defined via {@link setInputFormatScan} then they will be used
* when constructing the TableInputFormat. If not, the default will be
* to scan all column families and get only the latest version for each
* row.
* @param properties the properties containing parameters required for
* initialization of InputFormat
* @return the InputFormat instance
*/
public InputFormat<? extends WritableComparable, ? extends Writable>
getInputFormat(Properties howlProperties) { ... }


public HCatRecord convertToHCatRecord(WritableComparable baseKey,
Writable baseValue) throws IOException {
return converter.convert((Result)baseValue);
}

@Override
public void setInputPath(JobContext jobContext, String location)
throws IOException {
// error, no input path
}

@Override
public void setPartitionValues(JobContext jobContext,
Map<String,String> partitionValues)
throws IOException {
// error, no partitions in HCat tables
}

/**
* This should take the schema and use it to determine which HBase column
* families will be scanned as part of this.
*/
@Override
public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema)
throws IOException { ... }

/**
* Use a scan to set the read properties for the input format.
* @param scan - scan to define reading of the table
*/
public void setInputFormatScan(Scan scan) {
this.scan = scan;
}

}
HBaseTableReader.java
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;

package org.apache.hcatalog.hbase;

/**
* A class for reading HBase tables and converting the results to {@link HCatRecord}s.
* The intention is to allow HCat users to read the table as a side input. If
* The HBase table is being read as the main input to an MR job
* {@link HCatInputFormat} should be used.
*/
public class HBaseTableReader {

protected ResultConverter converter;

/**
* @param table - HCat table to read in HBase.
*/
public HBaseTableReader(HCatTableInfo table) {...}

/**
* Get a particular record from HBase.
* @param get - A get, as described by the HBase object
* @returns HCatRecord. This implementation will wrap HBase Result object to
* avoid unnecessary copies.
*/
public HCatRecord get(Get get) throws IOException { ... }

/**
* Get a set of records from HBase
* @param get - A list of gets, as described by the HBase objects
* @returns HCatRecord[]. This implementation will wrap HBase Result object to
* avoid unnecessary copies.
*/
public HCatRecord[] get(List<Get> get) throws IOException { ... }

/**
* Get a scanner for an HBase table
* @param scan - The HBase scan object that describes this scan
* @returns HCatResultScanner, a wrapper around HBase's ResultScanner that
* handles translations to HCatRecord
*/
public HCatResultScanner getScanner(Scan scan) throws IOException { ... }
}

HCatResultScanner.java
package org.apache.hcatalog.hbase;

public class HCatResultScanner implements Closeable, Iterable<HCatRecord> {

public void close() {...}

public Iterator<HCatRecord> iterator() {...}
}

For specifying how to read an HBase table, the existing class HCatTableInfo will be extended to contain an HBase Scan. This will take some work since Scan is not Serializable. According to the HBase API docs there are methods to serialize a Scan into a String, but I could not find such methods. HCatTableInfo will also take a Snapshot as a way to define how the table will be scanned.

HCatTableInfo.java
// This just shows changes to the class

import org.apache.hadoop.hbase.client.Scan;

/**
* HBase scan that defines how an HBase table will be read.
*/
private Scan scan;

public void setScan(Scan scan) { this.scan = scan }

public void setScan(Snapshot ss) { scan = ss.toScan(); }

public Scan getScan() { return scan; }

Snapshot.java
package org.apache.hcatalog.mapreduce;

/**
* A class that allows a user to define different versions for different column families
* to use when reading a table. This object allows the same set of versions to be read
* in multiple loads with {@link HCatLoader} or multiple instances of {@link HCatInputFormat}
*/
public class Snapshot implements Serializable {

// Table that this snapshot captures
private String tableName;

private String dbName;

// Map of column families and the versions to read on that column family
private HashMap<String, Long> cfVersions;

/**
* Construct a snapshot
* @param db - name of the database that this snapshot references
* @param table - name of the table that this snapshot references
*/
public Snapshot(String db, String table) {
tableName = table;
}

/**
* Produce a scan based on this snapshot. This is not a public function,
* as it is only intended to be used by {@link HCatTableInfo} to translate
* a {@link Snapshot} into {@link Scan}.
*/
Scan toScan() {
// since HBase only allows one timestamp for an entire scan this will
// instead need to be done via a filter.
...
}
}

A static method will be added to HCatInputFormat to build a snapshot that can be shared across instances of HCatInputFormat or HCatLoader.

HCatInputFormat.java
// Only changes are shown

/**
* @param db - name of the database that this snapshot references
* @param table - name of the table that this snapshot references
*/
public static Snapshot hbaseSnapshot(String db, String table) {
return new Snapshot(db, table);
}
ResultConverter.java
import org.apache.hadoop.hbase.client.Result;
package org.apache.hcatalog.hbase;


/**
* A class to convert records between HBase Result format and HCat's {@link HCatRecord}
* format. This includes type translation. This class is only intended for use by
* classes in this package, it is not public. The resulting HCatRecord only
* wraps the Result. Copying is not done aggressively.
*/
class ResultConverter {

protected SerDe deserializer;

/**
* @param table - HCatalog table to convert results for
*/
ResultConvert(HCatTableInfo table) {...}

/**
* @param result - HBase result to convert
* @return HCatRecord that wraps the input
*/
HCatRecord convert(Result result) { ... }

/**
* Given an HCatRecord convert it to a Put for HBase.
* @param record - HCatRecord to add put to
* @returns put to be given to HBase
*/
Put convert(HCatRecord record) {
...
}

}

Hive's HBaseStorageHandler provides a SerDe for doing translation between Hive and HBase types.

HCatLoader will be extended to include a constructor that takes a string for HBase specific keys. This string will match the formatting in Pig's HBaseStorage loader. However, these will need to be expanded to include other options settable by Scan, such as which version to get, how many versions to get, range of versions to get, the ability to set a Filter, and the ability to reference a snapshot file. Users who use this new constructor for a non-HBase table will be given an error. Otherwise they might expect certain filters to have been applied that will not have been applied. Users who use the default constructor for HBase tables will get the same behavior as if they had called HCatLoader(String) with an empty string.

Pig will naturally transmit the fields involved in the script to HCatLoader, which in turn can be turned into information on which column families are needed.

HCatLoader.java
/**
* A Pig loader for users reading from HBase tables.
*/
public class HCatLoader {


/**
* A constructor for users loading an HBase table. If the underlying
* table is not an HBase table, this will throw a RuntimeException.
* @param opts - A string of options for defining how the HBase scan will
* be done. These match the same options readable by Pig's HBaseStorage.
*/
public HCatLoader(String opts) { ... }


HCatalog will add a command line option to generate a snapshot. It will look something like: hcat.sh -snapshot "table=foo:a=1:b=2" outfile where foo is the table name and a and b are column families and 1 and 2 are the version numbers to use and outfile is a file to serialize the resulting snapshot into. Pig will also add an hcat command to its grunt shell. This command will simply redirect its arguments to hcat.sh. This will allow Pig users to create snapshots and then reference them in the load statements.

See the output section below on how version numbers are done.

Changes for Output

No changes should be necessary to the interfaces HCatOutputFormat and HCatStorer to support storing into HBase. From the user's perspective they are still just storing records. HCatOutputStorageDriver will change its interface to allow subclasses to specify a chain of OutputCommitter s that it wants to run as part of committing the data. A new HBaseOutputStorageDriver will be needed that takes HCatRecord s and turns them into HBase keys and values that can be consumed by HFileOutputFormat.

HCatOutputStorageDriver
// Only changes to the class are shown

/**
* A list of {@link OutputCommitter}s to run as part of running
* {@link HCatOutputCommitter}. Each time a method in the
* HCatOutputCommitter is invoked, it will invoke the
* equivalent method in each of the OutputCommitters in this
* list in the order they appear in the list &lt;em&gt;before&lt;/em&gt;
* calling the base committer or doing its own work. The passed in
* map should have the class name of the OutputCommitter as a key.
* The value Map<String, String> is a set of key value pairs that
* can be used to communicate parameters to the passed in OutputCommiter.
* That committer will be expected to have a constructor that takes
* a {@link TaskAttemptContext} and a Map<String, String>.
*/
protected Map<String, Map<String, String>> additionalCommitters;

/**
* Instantiates and returns instances of the {@link OutputCommitter}s that
* are specified in {@link additionalCommitters}. This is package level
* access because only {@link HCatOutputCommitter} should be calling it.
*/
OutputCommitter[] getCommitters() {
// use reflection to instantiate each of the specified classes
}

In HCatOutputCommitter, each of the methods that override OutputCommitter will be changed to go through the list of additionalCommitters and call the same method on each of those in the order that the committers are in the array before they call the method on the baseCommitter or do their own work.

The new class HBaseOutputStorageDriver will write records into a sequence file using ImmutableBytesWritable as a key and KeyValue as a value. ( TODO It is not clear to me what HBase wants in the key and what it wants in the value.) It will also add a new class HCatHBaseOutputCommitter to the additionalCommitters list. In its commitJob implementation this class will spawn two MapReduce jobs. The first will take the sequence files produced by HBaseOutputStorageDriver as input. It will have an identity mapper. It will call HFileOutputFormat.configureIncrementalLoad to set up the parallelism, partitioner, and sorting. It will use HFileOutputFormat as the output.

After this first job is finished a second job will run that will copy the resulting HFiles onto the appropriate region servers. It will accomplish this by having an new InputFormat. This new format will return a split for each region server, with that region server as the preferred location to run the map. Each map will then copy all region files for its region server. In most cases the map should be placed on the region server. Even in cases where it is not, the scheduler will most likely get it on the same rack as the appropriate region server, so the copy will still be useful. Once this job has finished the output committer will tell HBase to import the new region files. It will then need to inform the database that this revision number is now a valid revision number for that table.

A failure of either of these subsequent MapReduce jobs will be considered a failure of the users job, and reported to the user. No attempt will be made to stop part way and allow the user to continue from that point. On failure the revision number that was reserved for the write will need to be cancelled.

As HBase writes are done, either by HCoutOutputFormat or HBaseTableReaderWriter, they will reserve a revision number with HCatalog. Those reserved revision numbers will be stored in HCatalog's database. When the output format finished or the write finishes writing, the revisions will need to be promoted. A revision can be in one of three states: reserved, pending, or latest. reserved indicates that it is still being written. pending indicates that the writer is done but it cannot be marked latest currently because there is a reserved revision before it.

The logic for promoting revision numbers is:

# current initially refers to the revision we have finished and are promoting
if there exists reserved revisions < current:
revision state = pending
done
if there exists pending revisions < current:
discard pending revisions that are < current
while next revision after current is pending:
discard current
next revision = current
set current to latest
done

The logic for cancelling a revision number is:

# cancelled initially refers to the revision we are cancelling
if there exists reserved revision:
discard cancelled
done
if there exists pending revision: # it should be < cancelled, else there should have been a reserved
set greatest pending = latest
discard all other pending
discard cancelled
done
discard cancelled
done

TODO Currently HBase HTableOutputFormat which we propose to use here only supports writing one column family per job. Is this limitation acceptable?

By default the version number assigned to writes into HBase will be taken from the HCatalog Thrift server's clock. In the HA case where HCatalog has multiple servers this will mean a time daemon is necessary to keep the clocks in sync. The version number will be stored in HCatTableInfo.outputVersion, a new field. If the user has not already set a value, it will be set by HBaseOutputStorageDriver.getOutputLocation when it is called. To do this it will need to connect to the metastore and ask for a version number. The new general purpose Thrift message to the metastore can be used to answer this request. The metastore will need to store the last requested clock tick for a given table in the database so that if two requests come in at the same time they will not be given the same version number. This means that HCatalog will need its own metastore table or to modify the existing Hive table that tracks tables, (I doubt Hive will be interested in this one), which should be fine except it complicates the install process. It also means a new listener for the general purpose Thrift call that will handle this request. If the user has set a version a check will be made that the version is greater than any existing version for that table using the data recorded for that table in database and that it is less than the current time. If the user version is accepted it will be recorded as the latest version for that table in the database.

Changes to CLI

Assuming we use the HBaseStorageHandler to do connections Hive already provides a way to create HBase tables. These tables can be pre-existing in HBase, but need not be. For details see the Hive wiki.

See above for info on the changes to the CLI to support snapshots.

New API for Point Updates

A new API will be added for doing point updates. This API will not be intended for use by MR jobs, as puts in this API will be auto-flushed. All puts in this class will share the timestamp generated when the class is created.

HBaseTableReaderWriter.java
package org.apache.hcatalog.hbase;

public class HBaseTableReaderWriter extends HBaseTableReader {

private long timestamp;

/**
* @param table - HCat table to read from and write to in HBase.
*/
public HBaseTableReaderWriter(HCatTableInfo table) {
super(table);
}

/**
* Add one HCatRecord to the HBase.
* @param record - hcat record to add
*/
public void put(HCatRecord record) throws IOException {
// convert to Put
// set version to timestamp
HTable.put(put);
}

/**
* Add a list of HCatRecords to HBase. First all of the inputs will be
* converted to a list of Puts and then HTable.put(List<Put>) called so
* that these are updated as a batch.
* @param records - list of HCatRecords to put.
* @throws IOException
*/
public void put(List<HCatRecord> records) throws IOException {
// convert to List<Put>
// set version to timestamp
HTable.put(listofputs);
}
}

Changes to Notification

A new notification message "HBaseTableUpdated" will be added. HCatOutputCommitter.commitJob will be changed to send this message when the underlying table is an HBase table.

Changes to Security

Hive has a security checking interface HiveAuthorizationProvider. There is little documentation for it, but the code can be seen in source control along with their default implementation. At a first analysis it appears that if we switch our authorization checking to use this interface then Hive will "just work". We can then implement an HCatalog instance of this. It will have to decide whether a table is in HDFS or HBase based on information in the metastore, and authorize appropriately. Identified issues:

  • Hive authorization is only done in the client. This means any user that used the thrift API directly would bypass security. This is already true of HCatalog, as it also does the security checks in the client. Over time we should work with Hive to migrate this to the server, since users eventually will want to use the Thrift API.

At this point HBase's security is a patch that has not yet been accepted to trunk. This is a huge risk for the project. We will need to work with the HBase community to move this patch forward or propose an alternate patch. For the current state of things see the HBase blog.

Changes for Administration

TODO Will we need to support administration for HBase? How to do archiving, replication, and cleanup?

Other Open Questions

  1. Too many revisions can age out versions you want to read. How can we prevent this?
  2. What should we set maxVersions to for our HBase tables? Related to previous question.
  • No labels