Overview

An HCat Random Read framework is in the works (see Random Access Framework). The next step is to expose this to DSLs such as Pig so that it can be taken advantage of by a wider audience. For motivation see Use Case for Snapshots.

This design describes an EvalFunc which can be used to do random reads which will be useful for tasks such as skewed joins.

Usage

Given tables:

Table Click

clickId

campaignId

timestamp

1

a

123

2

b

124

3

b

125

Table Campaign

campaignId

pricePerClick

a

100

b

200

Using a direct lookup pig udf:

--specify table name, row key column, output schema
define campaignDirectLookup org.apache.hadoop.hcatalog.pig.DirectLookup("campaign","campaignId",
    "campaignId:chararray,pricePerClick:int");

A = LOAD 'click' AS (clickId: chararray, campaignId: chararray, timestamp: long);
--pass row key
B = FOREACH A GENERATE clickId, timestamp, campaignDirectLookup(campaignId);

Would yield:

Table B

clickId

timestamp

campaignId

pricePerClick

1

123

a

100

2

124

b

200

3

125

b

200

Design

UDF Skeleton code:

public class DirectLookup extends EvalFunc<Tuple> {
    private String tableName;
    private String keyCol;
    private String outputSchema;

    public DirectLookup(String tableName, String keyCol, String outputSchema) {
        this.tableName = tableName;
        this.keyCol = keyCol;
        this.outputSchema = outputSchema;
    }

    @Override
    public Schema outputSchema(Schema input) {
        Properties prop = UDFContext.getUDFContext().getUDFProperties(this.getClass(),new String[]{tableName});
        if(tableName != null) {
            //populate needed properties
            //create snapshot and serialize back into prop
            //set the snapshot here
        }
        return schema;
    }

    @Override
    public Tuple exec(Tuple tuple) throws IOException {
        Properties prop = UDFContext.getUDFContext().getUDFProperties(this.getClass(),new String[]{tableName});
        //deserialze snapshot and do the lookup here
        //return result
        return null;
    }
}

There are some tricks that had to be done to be able to perform similar client-side setup done by HCatInputFormat.setInput():

  • The udf does not have a default constructor this implies that each instance will be bound to a specific table, lookup key and output schema
  • Initialization will be done in EvalFunc.outputSchema, as Pig guarantees that this will be called on the fronted. Guarantee that the same configuration generated by initialization will be shared across all tasks of the MR job.
  • As documented in the Pig book, UDFContext will be used to pass properties generated in initialization from fronted to backend.
  • No labels