Problem statement 

When a gobblin job gets killed or canceled, it should shutdown gracefully: the job must end up with a recoverable state (fault tolerant): 

  • Discoverability of a valid last known good state 

  • Kafka audit count exactly once 

  • Lock revocation 

     

Last known good state 

A last know good state is the last state where both job state and data were successfully committed. Currently, Gobblin assumes the last known good job state is `current.jst`. While persisting the job state of the running job, a copy will be created to replace `current.jst`. The phases are: 

Phase 1: current last know good state is `20170702.jst` 

Phase 2: the running job persisted state `20170703.jst` and made a copy `_tmp_/current.jst` 

(copy as a way to achieve file completeness) 

Phase 3: delete `current.jst` 

Phase 4: rename `_tmp_/current.jst` to `current.jst` 

  

It has 2 problems:  

  1. If the job exits between phase 3 and 4, it ends up with a non-recoverable state 
  2.  If the job is killed between phase 2 and phase 3, the `current.jst` shadows the real last known good state  

We're looking for a straightforward, robust, and efficient way to discover the last known good state in the state store. Here are 4 options:   

Hadoop 2 supports symlink. Instead of copying the actual data, the state store changes the reference of `current.jst`. This approach doesn't help with problem 1. Not all file systems support symlink  

Option 2: Full scan of state store directory 

Every time, a job scans the directory and picks the valid latest job state. It solves problem 1 and 2 but is not efficient especially in the case where a job has a long running history.  

This approach can be improved by periodically archiving old job states. Listing and sorting over the non-archived job states will become more efficient. Data integrity can be done by storing a checksum into the job state. However, we are seeing performance degrade caused by saving state store to HDFS.   

Option 3: `current.jst.bk` 

Before deleting the `current.jst` in phase 3, a copy of it is persisted as a back file. If a job can't find `current.jst`, it will read `current.jst.bk`. Problem 1 is resolved, but it doesn't address problem 2.  

 Option 4: DB 

All of the efforts lead to an implementation of transactional storage, which is what DB is good at. This approach can solve problem 1 and 2 efficiently.  

In terms of what DB to use and whether it is a client/server based or embedded db, the table below demonstrates the pros/cons of MySQL C/S and Embedded RocksDB, which supports HDFS.  

Job instance isolation means isolating the state store of the same job running by different users in the same cluster or environment.  

 

  

MySQL C/S DB 

Embedded RocksDB 

Pros 

  • Implemented 

  • Allow concurrent access from multiple processes 

  • Job instance isolation by user specifying a unique primary key 

  • Job instance isolation by storing state store db file in different paths 

  • Zero operation overhead 

Cons 

  • Operating and maintaining a MySQL cluster, which provides high availability and fault tolerance 

  • Need implementation 

  • Doesn't allow multi-process write access 

Conclusion 

If a job requires fault tolerant state store, it should use a MySQL (C/S) state store. Task: Migrating jobs to use MySQL state store. 

Access control to MySQL state store in different environments(dev, prod) is required to prevent engineers working in dev to break state in production.  Add permission on if a user is allowed to use any primary key, avoid corrupting states stored by its actual owner  

Kafka audit count exactly once 

Currently, the kafka audit count is reported but the job may fail or exit without persisting its state. Next time, a new job is launched and the same audit count is reported again.  To approach the solution:  

Step1: Report aggregated Kafka audit count in the driver 

A dataset may have multiple task states. Kafka audit count will be sent in a dataset level instead of task level:  

  1. Aggregate Kafka audit count from task states of the dataset 
  2.  Put the aggregated count into the dataset state 
  3.  Persist dataset state is successfully 
  4.  Report the aggregated count   

With step1, it's guaranteed that failed or killed job won't report bogus audit count   

Step 2: Exactly once assistance from Kafka audit server  

If Kafka audit server can ignore duplicate audit count report by looking at its key/content, an exactly once semantic can be achieved by adding a checkpoint operation:   

5. Mark the dataset state is `Audited`   

Next time, the new job resends the audit count of the last known good dataset state if its status is not `Audited`   

Conclusion 

It is not clear whether Kafka Audit server can help us with the exactly once semantic, although the team is working on some kind of deduping audit messages. Step 1 only just reduces the failure window and error cases. It doesn't solve the issue.  

Lock revocation 

When a job suddenly exits withoutt cleaning its lock, no subsequent execution can be successfully launched as the existence of the lock is perceived as a previous job is running, even though it is not.  

Option 1: Remove the lock file in shutdown hook 

It's likely that removing the lock, especially from a distributed file system, takes a long time that exceeds the time limit of a shutdown hook in JVM. We could configure a reasonable ttl. But it won't work all the time.  

Option 2: Remove the lock while the rebooting gobblin service 

It's likely that the same gobblin service can be accidently deployed twice, booting the second service ends up removing the lock placed by the first one, breaking the semantic of the job lock.  

 Option 3: Use locks that require heartbeat renewal 

Use a zookeeper based lock. The owner renews the lease of the lock periodically. A lock renewal service needs implementing and designing to avoid hanging in any case, otherwise, the lease will expire and the lock will be claimed by another process, breaking the state of the current job.  

Conclusion 

If a job requires lock auto revocation, it should use a zookeeper based lock, which is already implemented. Task: implement lock renewal service  

Summary 

There are various implementations of state stores and locks, each providing different set of features. To achieve gracefully shutdown, a job should use MySQL state store and Zookeeper based lock. Tasks are, in order of priority:  

  1. Migrating jobs to use MySQL state store. If the time commitment is urgent, we can implement Option 2 (with archiving and checksum) for the sake of the existing jobs. New jobs are recommended to use MySQL state store   
  2. Implement log renewal service  
  3.  Implement reporting aggregated Kafka audit count in the driver    

Without support from kafka audit server, we haven't found a solution to fix the exactly once semantic of kafka audit count, However, implementing step 1 (task 3) only will mitigate the issue, reducing the failure window and error cases. 

 

  • No labels