You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

A Java API focused on mutating (insert/update/delete) records into transactional tables using Hive’s ACID feature. It will be introduced in Hive 2.0.0 (HIVE-10165).

Background

In certain data processing use cases it is necessary to modify existing data when new facts arrive. An example of this is the classic ETL merge where a copy of a data set is kept in sync with a master by the frequent application of deltas. The deltas describe the mutations (inserts, updates, deletes) that have occurred to the master since the previous sync. To implement such a case using Hadoop traditionally demands that the partitions containing records targeted by the mutations be rewritten. This is a coarse approach; a partition containing millions of records might be rebuilt because of a single record change. Additionally these partitions cannot be restated atomically; at some point the old partition data must be swapped with the new partition data. When this swap occurs, usually by issuing an HDFS rm followed by a mv , the possibility exists where the data appears to be unavailable and hence any downstream jobs consuming the data might unexpectedly fail. Therefore data processing patterns that restate raw data on HDFS cannot operate robustly without some external mechanism to orchestrate concurrent access to changing data.

The availability of ACID tables in Hive provides a mechanism that both enables concurrent access to data stored in HDFS (so long as it's in the ORC+ACID format), and also permits row level mutations or records within a table, without the need to rewrite the existing data. But while Hive itself supports INSERT, UPDATE and DELETE commands, and the ORC format can support large batches of mutations in a transaction, Hive's execution engine currently submits each individual mutation operation in a separate transaction and issues table scans (M/R jobs) to execute them. It does not currently scale to the demands of processing large deltas in an atomic manner. Furthermore it would be advantageous to extend atomic batch mutation capabilities beyond Hive by making them available to other data processing frameworks. The Streaming Mutation API does just this.

The Streaming Mutation API, although similar to the Streaming API, has a number of differences and are built to enable very different use cases. Superficially, the Streaming API can only write new data whereas the mutation API can also modify existing data. However the two APIs also based on very different transaction models. The Streaming API focuses on surfacing a continuous stream of new data into a Hive table and does so by batching small sets of writes into multiple short-lived transactions. Conversely the mutation API is designed to infrequently apply large sets of mutations to a data set in an atomic fashion; all mutations will either be applied or they will not. This instead mandates the use of a single long-lived transaction. This table summarises the attributes of each API:

AttributeStreaming APIMutation API
Ingest typeData arrives continuouslyIngests are performed periodically and the mutations are applied in a single batch
Transaction scopeTransactions are created for small batches of writesThe entire set of mutations should be applied within a single transaction
Data availabilitySurfaces new data to users frequently and quicklyChange sets should be applied atomically, either the effect of the delta is visible or it is not
Sensitive to record orderNo, records do not have pre-existing lastTxnIds or bucketIds. Records are likely being written into a single partition (today's date for example)Yes, all mutated records have existing RecordIdentifiers and must be grouped by [partitionValues, bucketId] and sorted by lastTxnId. These record coordinates initially arrive in an order that is effectively random.
Impact of a write failureTransaction can be aborted and producer can choose to resubmit failed records as ordering is not important.Ingest for the respective must be halted and failed records resubmitted to preserve sequence.
User perception of missing dataData has not arrived yet → "latency?""This data is inconsistent, some records have been updated, but other related records have not" - consider here the classic transfer between bank accounts scenario
API end point scopeA given HiveEndPoint instance submits many transactions to a specific bucket, in a specific partition, of a specific tableA set ofMutationCoordinators write changes to unknown set of buckets, of an unknown set of partitions, of specific tables (can be more than one), within a single transaction.

Structure

The API comprises two main concerns: transaction management, and the writing of mutation operations to the data set. The two concerns have a minimal coupling as it is expected that transactions will be initiated from a single job launcher type processes while the writing of mutations will be scaled out across any number of worker nodes. In the context of Hadoop M/R these can be more concretely defined as the Tool and Map/Reduce task components. However, use of this architecture is not mandated and in fact both concerns could be handled within a single simple process depending on the requirements.

Note that a suitably configured Hive instance is required to operate this system even if you do not intend to access the data from within Hive. Internally, transactions are managed by the Hive MetaStore. Mutations are performed to HDFS via ORC APIs that bypass the MetaStore. Additionally you may wish to configure your MetaStore instance to perform periodic data compactions.

Note on packaging: The APIs are defined in the org.apache.hive.hcatalog.streaming.mutate Java package and included as the hive-hcatalog-streaming jar.

Data requirements

Generally speaking, to apply a mutation to a record one must have some unique key that identifies the record. However, primary keys are not a construct provided by Hive. Internally Hive uses RecordIdentifiers stored in a virtual ROW__ID column to uniquely identified records within an ACID table. Therefore, any process that wishes to issue mutations to a table via this API must have available the corresponding row ids for the target records. What this means in practice is that the process issuing mutations must first read in a current snapshot the data and then join the mutations on some domain specific primary key to obtain the corresponding Hive ROW__ID. This is effectively what occurs within Hive's table scan process when an UPDATE or DELETE statement is executed. The AcidInputFormat provides access to this data via AcidRecordReader.getRecordIdentifier().

The implementation of the ACID format places some constraints on the order in which records are written and it is important that this ordering is enforced. Additionally, data must be grouped appropriately to adhere to the constraints imposed be the OrcRecordUpdater. Grouping also makes it possible parallelise the writing of mutations for the purposes of scaling. Finally, to correctly bucket new records (inserts) there is a slightly unintuitive trick that must be applied.

All of these data sequencing concerns are the responsibility of the client process calling the API which is assumed to have first class grouping and sorting capabilities (Hadoop Map/Reduce etc). The streaming API provides nothing more than validators that fail fast when they encounter groups and records that are out of sequence.

In short, API client processes should prepare data for the mutate API like so:

 

  • MUST: Order records by ROW__ID.originalTxn, then ROW__ID.rowId.
  • MUST: Assign a ROW__ID containing a computed bucketId to records to be inserted.
  • SHOULD: Group/partition by table partition value, then ROW__ID.bucketId.

 

The addition of a bucket ids to insert records prior to grouping and sorting seems unintuitive. However, it is required both to ensure adequate partitioning of new data and bucket allocation consistent with that provided by Hive. In a typical ETL the majority of mutation events are inserts, often targeting a single partition (new data for the previous day, hour, etc.) If more that one worker is writing said events, were we to leave the bucket id empty then all inserts would go to a single worker (e.g: reducer) and the workload could be heavily skewed. The assignment of a computed bucket allows inserts to be more usefully distributed across workers. Additionally, when Hive is working with the data it may expect records to have been bucketed in a way that is consistent with it's own internal scheme. A convenience type and method is provided to more easily compute and append bucket ids: BucketIdResolver and BucketIdResolverImpl.

Update operations should not attempt to modify values of partition or bucketing columns. The API does not prevent this and such attempts could lead to data corruption.

Streaming requirements

A few things are currently required to use streaming. 

  1. Currently, only ORC storage format is supported. So 'stored as orc' must be specified during table creation.
  2. The hive table must be bucketed, but not sorted. So something like 'clustered by (colName) into 10 buckets' must be specified during table creation.
  3. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  4. Settings required in hive-site.xml for MetaStore:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.support.concurrency = true
    3. hive.compactor.initiator.on = true
    4. hive.compactor.worker.threads > 0

Note: Streaming mutations to unpartitioned tables is also supported.

Record layout

The structure, layout, and encoding of records is the exclusive concern of the client ETL mutation process and may be quite different from the target Hive ACID table. The mutation API requires concrete implementations of the MutatorFactory and Mutator classes to extract pertinent data from records and serialize data into the ACID files. Fortunately base classes are provided (AbstractMutator, RecordInspectorImpl) to simplify this effort and usually all that is required is the specification of a suitable ObjectInspector and the provision of the indexes of the ROW__ID and bucketed columns within the record structure. Note that all column indexes in these classes are with respect to your record structure, not the Hive table structure.

You will likely also want to use a BucketIdResolver to append bucket ids to new records for insertion. Fortunately the core implementation is provided in BucketIdResolverImpl but note that bucket column indexes must be presented in the same order as they are in the Hive table definition to ensure consistent bucketing. Note that you cannot move records between buckets and an exception will be thrown if you attempt to do so. In real terms this mean that you should not attempt to modify the values in bucket columns with an UPDATE.

Connection and Transaction management

The MutatorClient class is used to create and manage transactions in which mutations can be performed. The scope of a transaction can extend across multiple ACID tables. When a client connects it communicates with the meta store to verify and acquire meta data for the target tables. An invocation of newTransaction then opens a transaction with the meta store, finalizes a collection of AcidTables and returns a new Transaction instance. The acid tables are light-weight, serializable objects that are used by the mutation writing components of the API to target specific ACID file locations. Usually your MutatorClient will be running on some master node and your coordinators on worker nodes. In this event the AcidTableSerializer can be used to encode the tables in a more transportable form, for use as a Configuration property for example.

As you would expect, a Transaction must be initiated with a call to begin before any mutations can be applied. This invocation acquires a lock on the targeted tables using the meta store, and initiates a heartbeat to prevent transaction timeouts. It is highly recommended that you register aLockFailureListener with the client so that your process can handle any lock or transaction failures. Typically you may wish to abort the job in the event of such an error. With the transaction in place you can now start streaming mutations with one or more MutatorCoordinator instances (more on this later), can can finally commit or abort the transaction when the change set has been applied, which will release the lock with the meta store client. Finally you should close the mutation client to release any held resources.

The MutatorClientBuilder is provided to simplify the construction of clients.

WARNING: Hive doesn't currently have a deadlock detector (it is being worked on as part of HIVE-9675). This API could potentially deadlock with other stream writers or with SQL users.

Writing data

The MutatorCoordinator class is used to issue mutations to an ACID table. You will require at least one instance per table participating in the transaction. The target of a given instance is defined by the respective AcidTable used to construct the coordinator. It is recommended that aMutatorClientBuilder is used to simplify the construction process.

Mutations can be applied by invoking the respective insertupdate , and delete methods on the coordinator. These methods each take as parameters the target partition of the record and the mutated record. In the case of an unpartitioned table you should simply pass an empty list as the partition value. For inserts specifically, only the bucket id will be extracted from the RecordIdentifier, the transactionId and rowId will be ignored and replaced by appropriate values in the RecordUpdater

. Additionally, in the case of deletes, everything but the RecordIdentifier in the record will be ignored and therefore it is often easier to simply submit the original record.

Caution: As mentioned previously, mutations must arrive in specific order for the resultant table data to be consistent. Coordinators will verify a naturally ordered sequence of [lastTransactionId, rowId] and will throw an exception if this sequence is broken. This exception should almost certainly be escalated so that the transaction is aborted. This, along with the correct ordering of the data, is the responsibility of the client using the API.

Dynamic Partition Creation

It is very likely to be desirable to have new partitions created automatically (say on a hourly basis). In such cases requiring the Hive admin to pre-create the necessary partitions may not be reasonable. The API allows coordinators to create partitions as needed (see: MutatorClientBuilder.addSinkTable(String, String, boolean)). Partition creation being an atomic action, multiple coordinators can race to create the partition, but only one would succeed, so coordinators clients need not synchronize when creating a partition. The user of the coordinator process needs to be given write permissions on the Hive table in order to create partitions.

Care must be taken when using this option as it requires that the coordinators maintain a connection with the meta store database. When coordinator are running in a distributed environment (as is likely the case) it possible for them to overwhelm the meta store. In such cases it may be better to disable partition creation and collect a set of affected partitions as part of your ETL merge process. These can then be created with a single meta store connection in your client code, once the cluster side merge process is complete.

Finally, note that when partition creation is disabled the coordinators must synthesize the partition URI as they cannot retrieve it from the meta store. This may cause problems if the layout of your partitions in HDFS does not follow the Hive standard (as implemented inorg.apache.hadoop.hive.metastore.Warehouse.getPartitionPath(Path, LinkedHashMap <String, String>)).

Reading data

Although this API is concerned with writing changes to data, as previously stated we'll almost certainly have to read the existing data first to obtain the relevant ROW_IDs . Therefore it is worth noting that reading ACID data in a robust and consistent manner requires the following:

  1. Obtaining a valid transaction list from the meta store (ValidTxnList).
  2. Acquiring a lock with the meta store and issuing heartbeats (LockImpl can help with this).
  3. Configuring the OrcInputFormat and then reading the data. Make sure that you also pull in the ROW__ID values. See: AcidRecordReader.getRecordIdentifier.
  4. Releasing the lock.

Example

So to recap, the sequence of events required to apply mutations to a dataset using the API is:

  1. Create a MutatorClient to manage a transaction for the targeted ACID tables. This set of tables should include any transactional destinations or sources. Don't forget to register a LockFailureListener so that you can handle transaction failures.
  2. Open a new Transaction with the client.
  3. Get the AcidTables from the client.
  4. Begin the transaction.
  5. Create at least one MutatorCoordinator for each table. The AcidTableSerializer can help you transport the AcidTables when your workers are in a distributed environment.
  6. Compute your mutation set (this is your ETL merge process).
  7. Optionally: collect the set of affected partitions.
  8. Append bucket ids to insertion records. A BucketIdResolver can help here.
  9. Group and sort your data appropriately.
  10. Issue mutation events to your coordinators.
  11. Close your coordinators.
  12. Abort or commit the transaction.
  13. Close your mutation client.
  14. Optionally: create any affected partitions that do not exist in the meta store.

See ExampleUseCase and TestMutations.testUpdatesAndDeletes() for some very simple usages.

  • No labels