Overview
An HCat Random Read framework is in the works (see Random Access Framework). The next step is to expose this to DSLs such as Pig so that it can be taken advantage of by a wider audience. For motivation see Use Case for Snapshots.
This design describes an EvalFunc which can be used to do random reads which will be useful for tasks such as skewed joins.
Usage
Given tables:
Table Click
clickId |
campaignId |
timestamp |
---|---|---|
1 |
a |
123 |
2 |
b |
124 |
3 |
b |
125 |
Table Campaign
campaignId |
pricePerClick |
---|---|
a |
100 |
b |
200 |
Using a direct lookup pig udf:
--specify table name, row key column, output schema define campaignDirectLookup org.apache.hadoop.hcatalog.pig.DirectLookup("campaign","campaignId", "campaignId:chararray,pricePerClick:int"); A = LOAD 'click' AS (clickId: chararray, campaignId: chararray, timestamp: long); --pass row key B = FOREACH A GENERATE clickId, timestamp, campaignDirectLookup(campaignId);
Would yield:
Table B
clickId |
timestamp |
campaignId |
pricePerClick |
---|---|---|---|
1 |
123 |
a |
100 |
2 |
124 |
b |
200 |
3 |
125 |
b |
200 |
Design
UDF Skeleton code:
public class DirectLookup extends EvalFunc<Tuple> { private String tableName; private String keyCol; private String outputSchema; public DirectLookup(String tableName, String keyCol, String outputSchema) { this.tableName = tableName; this.keyCol = keyCol; this.outputSchema = outputSchema; } @Override public Schema outputSchema(Schema input) { Properties prop = UDFContext.getUDFContext().getUDFProperties(this.getClass(),new String[]{tableName}); if(tableName != null) { //populate needed properties //create snapshot and serialize back into prop //set the snapshot here } return schema; } @Override public Tuple exec(Tuple tuple) throws IOException { Properties prop = UDFContext.getUDFContext().getUDFProperties(this.getClass(),new String[]{tableName}); //deserialze snapshot and do the lookup here //return result return null; } }
There are some tricks that had to be done to be able to perform similar client-side setup done by HCatInputFormat.setInput():
- The udf does not have a default constructor this implies that each instance will be bound to a specific table, lookup key and output schema
- Initialization will be done in EvalFunc.outputSchema, as Pig guarantees that this will be called on the fronted. Guarantee that the same configuration generated by initialization will be shared across all tasks of the MR job.
- As documented in the Pig book, UDFContext will be used to pass properties generated in initialization from fronted to backend.