Overview

One the main motivations for using HBase is random access. Among other uses, random reads enable the efficient implementation of dimension stores by performing efficient skewed joins.

This design outlines a Random Read framework enhancement to HCatalog. StorageHandlers implementing the framework would enable users to leverage the underlying storage's random access capability. This is an implementation of Generic MR Random Access Framework. Though the current implementation will only support random reads.

Design

Class diagram describing the new classes and integration with HCat.

HCatRandomAccess

A RandomAccess instance enables a user to perform random reads on a table. A instance is permanently bound to a single table and output schema.

public class HCatRandomAccess extends RandomAccess<Object, HCatRecord, HCatRecord> implements Configurable {

    /**
     * Generate the properties needed to pass to the RandomAccess framework
     * @param db database where the table resides, if null default database is assumed
     * @param table
     * @param inputSchema columns to materialize for reading
     * @param outputSchema schema of records that will be written
     * @param properties implentation specific configuration that may need to be passed.
     * @return a random access configuration
     * @throws IOException if HCat or the storageHandler has trouble communicating
     * with the metastore, storage storage system, etc
     * @throws IllegalArgumentException if database/table/referenced columns do not exist,
     * or if tables' StorageHandler implementation is not an instanceof {@link RandomAccessible},
     */
    public static Map<String,String> createProperties(String db, String table, HCatSchema inputSchema,
                                HCatSchema outputSchema, Map<String, String> properties) throws IOException {
    ...
    }

    /**
     * @param key of row to retrieve
     * @return
     * @throws IOException on problems with retrieving the record
     * @throws IllegalArgumentException when datatype is not the same
     * type as the table's defined key column
     */
    @Override
    public HCatRecord get(Object key) throws IOException {
       ...
    }

    /**
     * @param key this field is ignored
     * @param value
     * @throws IOException
     * @throws IllegalArgumentException if value is null, or if value does not follow
     * the defined outputSchema, or if value's key field is null
     */
    @Override
    public void put(Object key, HCatRecord value) throws IOException {
       throw new UnsupportedOperationException("put currently not supported");
    }

    @Override
    public HCatRandomAccessCommitter getCommitter(JobContext context) throws IOException {
        return new HCatRandomAccessCommitter();
    }
    ....
}

Presently it contains a single method, which lets users retrieve a row via it's row Key:

    /**
     * @param key of row to retrieve
     * @return
     * @throws IOException on problems with retrieving the record
     * @throws IllegalArgumentException when datatype is not the same
     * type as the table's defined key column
     */
    @Override
    public HCatRecord get(Object key) throws IOException

ie

String rowKey = ...
HCatRecord myRecord = randomAccess.get(rowKey);
RandomAccessible

StorageHandler developers wishing to expose random read functionality will make use of the RandomAccessible mixin. Developers will then have to implement the following factory method:

public RandomAccess getRandomAccess(String databaseName, String tableName, Map<String, String> properties)

Developers can then construct their implementation specific RandomReader.

HCatRandomAccess.createProperties()

User will need to declare the tables they wish random reads from prior to MR job submission. This prevents each map task from having to perform the setup and possibly strain the metastore server. This is done using:

public static Map<String,String> createProperties(String db, String table, HCatSchema inputSchema,
    HCatSchema outputSchema, Map<String, String> properties) throws IOException;

During each call the method will:

  • Query the metastore for the table information
  • verify the StorageHandler is RandomAccessible (instanceof)
  • create a new InputJobInfo.
  • Deserialize the main InputJobInfo object from configuration object
  • Deserialize the randomAccessTableMap from the main InputJobInfo object's properties
  • Update the randomAccessTableMap with new db.table->InputJobInfo pair
  • Serialize randomAccessTableMap back into main InputJobInfo object
  • Serialize main InputJobInfo object back into configuration object
HCatRandomAccess.initialize()
public void initialize(Map<String, String> properties) throws IOException;

This method call will perform the following:

  • Retrieve the InputJobInfo instance for the selected table
  • Using getStorerInfo(), instantiate the StorageHandler class, then configure
  • return storageHandler.getRandomAccess()

Sample Usage

In job setup:

Job job = new Job();
...
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/foo"));

Map<String,String>properties = new HashMap<String,String>();
HCatSchema inputSchema = ...;
HCatSchema outputSchema = ...;
Map<String,String>properties = new HashMap<String,String>();
HCatSchema inputSchema = null;
HCatSchema outputSchema = null;
RandomAccess.addRandomAccess("myTableAlias", HCatRandomAccess.class,
    HCatRandomAccess.createProperties("my_db", "my_table", inputSchema, outputSchema, properties),
    job.getConfiguration());
job.waitForCompletion(true);

In MR job:

    public static class SampleMap extends Mapper<LongWritable,Text,LongWritable,Text> {
        RandomAccess access;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            access = RandomAccess.getRandomAccess("myTable", new byte[]{}.getClass(), Result.class, Put.class,context.getConfiguration());
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            access.get(new byte[]{});
            access.put(new byte[]{}, new Put());
        }

    }
  • No labels