Some of the applications working on data stored in HCatalog sometimes follow a producer consumer model. A typical workflow is producer is continuously adding new data in a table by creating new partitions.  Once it generates a set of partition producer wants to send a signal to the consumer that data is produced and is ready for consumption. HCatalog 0.2 supports this use case by adding in two api. A producer can mark a set of partition as done through the api markPartitionForEvent(). Once a partition is marked 'done', HCatalog sends a message on a message bus notifying registered consumer about the arrival of data. HCatalog also retains the message with itself for some duration, during which a consumer can ask HCatalog if the partition(s) was marked 'done' previously by some producer through the api isPartitionMarkedForEvent(). 

Lets assume there is a database named "mydb" and table in it named "mytbl". Table is partitioned by date and country. A producer can mark a set of partitions as done as shown below. Following piece of code illustrates the usage of these api from producer's perspective. Here producer is marking all the partitions whose date is 20110711 as done. Note that wildcard * can be used to denote "all". Further, map must contain all the key value pairs corresponding to partition columns. If table is partitioned by two columns all of the columns need to be specified which optionally can take "*" as value to denote all the values for that partition key. Following code illustrates how a producer can mark a set of partitions as "done".

                

HiveMetaStoreClient msc = new HiveMetaStoreClient(conf);

// Create a map, specifying partition key names and values
Map<String,String> partMap = new HashMap<String, String>();
partMap.put("date","20110711");
partMap.put("country","*");

// Mark the partition as "done"
msc.markPartitionForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);

Consumer if it has registered with message bus and is currently live will get the callback from message bus once producer marks the partition set as done.  Alternatively, it can ask explicitly for a particular partition set from metastore. Following code illustrate the usage from consumer's perspective:

// Enquire to metastore whether a particular partition has been marked or not.
boolean marked = msc.isPartitionMarkedForEvent("mydb", "mytbl", partMap, PartitionEventType.LOAD_DONE);

// Or register to a message bus and get asynchronous callback.
ConnectionFactory connFac = new ActiveMQConnectionFactory(amqurl);
Connection conn = connFac.createConnection();
conn.start();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Destination hcatTopic = session.createTopic(topic);
MessageConsumer consumer = session.createConsumer(hcatTopic);
consumer.setMessageListener(this);


public void onMessage(Message msg) {

if(msg.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_PARTITION_DONE_EVENT)){
// Check if it is mark-set-done message.				
			
  MapMessage mapMsg = (MapMessage)msg;
  Enumeration<String> keys = mapMsg.getMapNames();
  
  // Enumerate over all keys. This will print key value pairs specifying the particular partition 
  // which was marked done. In this case, it will print:
  // date : 20110711
  // country: *

  while(keys.hasMoreElements()){
    String key = keys.nextElement();
    System.out.println(key + " : " + mapMsg.getString(key));
  }
}
  System.out.println("Message: "+msg);
  • No labels

1 Comment

  1. Is there possibility of adding any meta data while marking partitions done.

    In many use cases need to add some meta data while marking partitions done to hint applications the "level" of completeness, where the producer knows that there is some data missing but still want to give an option for the consumer to start processing even though there could be more data coming in later.

    or

    in a scenario where data is continuously coming in, the producer want to indicate new data that has arrived.