The availability of ACID tables in Hive provides a mechanism that both enables concurrent access to data stored in HDFS (so long as it's in the ORC+ACID format) , and also permits row level mutations or records within a table, without the need to rewrite the existing data. But while Hive itself supports
DELETE commands, and the ORC format can support large batches of mutations in a transaction, Hive's execution engine currently submits each individual mutation operation in a separate transaction and issues table scans (M/R jobs) to execute them. It does not currently scale to the demands of processing large deltas in an atomic manner. Furthermore it would be advantageous to extend atomic batch mutation capabilities beyond Hive by making them available to other data processing frameworks. The Streaming Mutation API does just this.
The Streaming Mutation API, although similar to the Streaming API, has a number of differences and are is built to enable very different use cases. Superficially, the Streaming API can only write new data whereas the mutation API can also modify existing data. However the two APIs also based on very different transaction models. The Streaming API focuses on surfacing a continuous stream of new data into a Hive table and does so by batching small sets of writes into multiple short-lived transactions. Conversely the mutation API is designed to infrequently apply large sets of mutations to a data set in an atomic fashion; all mutations will either be applied or they will not. This instead mandates the use of a single long-lived transaction. This table summarises the attributes of each API:
Note on packaging: The APIs are defined in the
org.apache.hive.hcatalog.streaming.mutate Java package and included as the
Generally speaking, to apply a mutation to a record one must have some unique key that identifies the record. However, primary keys are not a construct provided by Hive. Internally Hive uses
RecordIdentifiers stored in a virtual
ROW__ID column to uniquely identified records within an ACID table. Therefore, any process that wishes to issue mutations to a table via this API must have available the corresponding row ids for the target records. What this means in practice is that the process issuing mutations must first read in a current snapshot the data and then join the mutations on some domain specific primary key to obtain the corresponding Hive
ROW__ID. This is effectively what occurs within Hive's table scan process when an
DELETE statement is executed. The
AcidInputFormat provides access to this data via
In short, API client processes should prepare data for the mutate API like so:
- MUST: Order records by
- MUST: Assign a
ROW__IDcontaining a computed
bucketIdto records to be inserted.
- SHOULD: Group/partition by table partition value, then
The addition of a bucket ids to insert records prior to grouping and sorting seems unintuitive. However, it is required both to ensure adequate partitioning of new data and bucket allocation consistent with that provided by Hive. In a typical ETL the majority of mutation events are inserts, often targeting a single partition (new data for the previous day, hour, etc.) If more that one worker is writing said events, were we to leave the bucket id empty then all inserts would go to a single worker (e.g: reducer) and the workload could be heavily skewed. The assignment of a computed bucket allows inserts to be more usefully distributed across workers. Additionally, when Hive is working with the data it may expect records to have been bucketed in a way that is consistent with it's own internal scheme. A convenience type and method is provided to more easily compute and append bucket ids:
Update operations should not attempt to modify values of partition or bucketing columns. The API does not prevent this and such attempts could lead to data corruption.
A few things are currently required to use streaming.
Note: Streaming mutations to unpartitioned tables is also supported.
The structure, layout, and encoding of records is the exclusive concern of the client ETL mutation process and may be quite different from the target Hive ACID table. The mutation API requires concrete implementations of the
Mutator classes to extract pertinent data from records and serialize data into the ACID files. Fortunately base classes are provided (
RecordInspectorImpl) to simplify this effort and usually all that is required is the specification of a suitable
ObjectInspector and the provision of the indexes of the
ROW__ID and bucketed columns within the record structure. Note that all column indexes in these classes are with respect to your record structure, not the Hive table structure.
You will likely also want to use a
BucketIdResolver to append bucket ids to new records for insertion. Fortunately the core implementation is provided in
BucketIdResolverImpl but note that bucket column indexes must be presented in the same order as they are in the Hive table definition to ensure consistent bucketing. Note that you cannot move records between buckets and an exception will be thrown if you attempt to do so. In real terms this mean that you should not attempt to modify the values in bucket columns with an
Connection and Transaction
MutatorClient class is used to create and manage transactions in which mutations can be performed. The scope of a transaction can extend across multiple ACID tables. When a client connects it communicates with the meta store to verify and acquire meta data for the target tables. An invocation of
newTransaction then opens a transaction with the meta store, finalizes a collection of
AcidTables and returns a new
Transaction instance. The acid tables are light-weight, serializable objects that are used by the mutation writing components of the API to target specific ACID file locations. Usually your
MutatorClient will be running on some master node and your coordinators on worker nodes. In this event the
AcidTableSerializer can be used to encode the tables in a more transportable form, for use as a
Configuration property for example.
WARNING: Hive doesn't currently have a deadlock detector (it is being worked on as part of HIVE-9675). This API could potentially deadlock with other stream writers or with SQL users.
MutatorCoordinator class is used to issue mutations to an ACID table. You will require at least one instance per table participating in the transaction. The target of a given instance is defined by the respective
AcidTable used to construct the coordinator. It is recommended that a
MutatorClientBuilder is used to simplify the construction process.
Finally, note that when partition creation is disabled the coordinators must synthesize the partition URI as they cannot retrieve it from the meta store. This may cause problems if the layout of your partitions in HDFS does not follow the Hive standard (as implemented in
org.apache.hadoop.hive.metastore.Warehouse.getPartitionPath(Path, LinkedHashMap <String, String>)).
Although this API is concerned with writing changes to data, as previously stated we'll almost certainly have to read the existing data first to obtain the relevant
ROW_IDs . Therefore it is worth noting that reading ACID data in a robust and consistent manner requires the following: