Overview

Notification mechanism is introduced in Hcatalog so that a user can be notified of certain events occurring in Metastore if she desires so. Events are of six types: add_database, add_table, add_partition, drop_partition, drop_table, drop_database. When any of these events occur on Metastore message is sent to a Message Bus. Any user who wish to get notification for these messages can subscribe to a message bus. Once subscribed, message bus will deliver the messages to the subscriber.

Topic Name

The client in order to receive message must be subscribed to the right topic. Topic name for different types of events are different and is configurable in few cases. All default topic names are prefixed with a string which is configurable in hive-site.xml. The configuration key is hcat.msgbus.topic.prefix which defaults to hcat. We shall refer to the value of the topic prefix as HCAT_TOPIC_PREFIX in rest of the document. For example drop_database events would be sent to topic HCAT_TOPIC_PREFIX (e.g. hcat). add_partition messages would be sent to HCAT_TOPIC_PREFIX.DB_NAME.TABLE_NAME (eg: hcat.default.example_table). But the topic name for events on a table could be changed per table by user. For example following hive query sets the topic name for the table example_table to the string example_table_topic_name:

ALTER TABLE example_table SET TBLPROPERTIES ("hcat.msgbus.topic.name" = "example_table_topic_name")
How to receive notification

To start receiving message you first need to create a connection to messagebus as demonstrated below:

    ConnectionFactory connFac = new ActiveMQConnectionFactory(amqurl);
    Connection conn = connFac.createConnection();
    conn.start();

Then subscribe to a topic you are interested in. Following example shows how the subscriber first finds out the topic name for the table she is interested in, and then subscribes to it.

    HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf);
    String topicName = msc.getTable("mydb", "myTbl").getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);

Use the topic name to subscribe to a topic as follows:

    Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
    Destination hcatTopic = session.createTopic(topicName);
    MessageConsumer consumer = session.createConsumer(hcatTopic);
    consumer.setMessageListener(this);

To start receiving messages user need to implement a jms interface MessageListener which will make you implement a method onMessage(Message msg). This method will be called whenever a new message arrives on MessageBus. The message contains a partition object representing the corresponding partition, which can be retrieved as following:

@Override
public void onMessage(Message msg){

  // We are interested in only add_partition events on this table.
  // So, check message type first.
  if(msg.getStringProperty(HCatConstants.HCAT_EVENT).equals(HCatConstants.HCAT_ADD_PARTITION_EVENT)){

    // Retrieve HCatEventMessage, using MessagingUtils.
    HCatEventMessage hcatMessage = MessagingUtils.getMessage(msg);

    //Get the partition-keys for all partitions added.
    List<Map<String, String>> partitionList = ((AddPartitionMessage)hcatMessage).getPartitions();
  }
}

You need to have jms jar in your classpath to make this work. You additionally need to have a jms provider’s jar in your classpath as well. Hcatalog uses ActiveMQ as a jms provider. In principle any JMS provider can be used in client side. However, ActiveMQ is recommended. It can be obtained from: http://activemq.apache.org/activemq-550-release.html

Event Message Formats

While HCatalog Event-string formats are pluggable, the strings are in JSON by default. Each event conveys only just enough information to identify the Database/Table/Partition that's been added/deleted in HCatalog. An event-consumer may use the identifiers specified in the event to query HCatalog for further information.
HCatalog sends events for 6 metastore operations. Here's a listing of the supported events, and their corresponding event formats:

1. Creation of Database:

Event type-string: "CREATE_DATABASE"
Topic Name: HCAT_TOPIC_PREFIX
Example JSON Format:

{
  "timestamp" : 1360272556,
  "eventType" : "CREATE_DATABASE",
  "server"    : "hcatserver.mydomain.net",
  "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET",
  "db"        : "mydb"
}

2. Dropping a Database:

Event type-string: "DROP_DATABASE"
Topic Name: HCAT_TOPIC_PREFIX
Example JSON Format:

{
  "timestamp" : 1360272556,
  "eventType" : "DROP_DATABASE",
  "server"    : "hcatserver.mydomain.net",
  "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET",
  "db"        : "mydb"
}

3. Creation of a Table:

Event type-string: "CREATE_TABLE"
Topic Name: HCAT_TOPIC_PREFIX.DB_NAME
Example JSON Format:

{
  "timestamp" : 1360272556,
  "eventType" : "CREATE_TABLE",
  "server"    : "hcatserver.mydomain.net",
  "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET",
  "db"        : "mydb",
  "table"     : "mytbl" 
}

4. Dropping a Table:

Event type-string: "DROP_TABLE"
Topic Name: HCAT_TOPIC_PREFIX.DB_NAME
Example JSON Format:

{
  "timestamp" : 1360272556,
  "eventType" : "DROP_TABLE",
  "server"    : "hcatserver.mydomain.net",
  "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET",
  "db"        : "mydb",
  "table"     : "mytbl" 
}

5. Adding (an atomic set of) partitions:

Event type-string: "ADD_PARTITION"
Topic Name: HCAT_TOPIC_PREFIX.DB_NAME.TABLE_NAME (default) but is user configurable
Example JSON Format:

{
  "timestamp" : 1360272556,
  "eventType" : "ADD_PARTITION",
  "server"    : "hcatserver.mydomain.net",
  "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET",
  "db"        : "mydb",
  "table"     : "mytbl",
  "partitions": [
                   { "partKey1" : "partVal1A", "partKey2" : "partVal2A" },
                   { "partKey1" : "partVal1B", "partKey2" : "partVal2B" },
                   { "partKey1" : "partVal1C", "partKey2" : "partVal2C" }
                ]
}

6. Dropping (a set of) partitions:

Event type-string: "DROP_PARTITION"
Topic Name: HCAT_TOPIC_PREFIX.DB_NAME.TABLE_NAME (default) but is user configurable
Example JSON Format:

{
  "timestamp" : 1360272556,
  "eventType" : "DROP_PARTITION",
  "server"    : "hcatserver.mydomain.net",
  "servicePrincipal" : "hcat/hcatserver@MYDOMAIN.NET",
  "db"        : "mydb",
  "table"     : "mytbl",
  "partitions": [
                   { "partKey1" : "partVal1A", "partKey2" : "partVal2A" },
                   { "partKey1" : "partVal1B", "partKey2" : "partVal2B" },
                   { "partKey1" : "partVal1C", "partKey2" : "partVal2C" }
                ]
}

All the JMS messages are sent as TextMessage instances. Apart from the message-body, each message conveys 3 string properties, using the following keys:

  1. HCatConstants.HCAT_EVENT: The event-type string (E.g. "CREATE_TABLE", "ADD_PARTITIONS", etc.)
  2. HCatConstants.HCAT_MESSAGE_VERSION: The version-string for the messages (E.g. "0.1", etc.)
  3. HCatConstants.HCAT_FORMAT: An identifier for the message format (E.g. "json", by default.)
  • No labels