Title: DFM support in  Sqoop

JIRA : SQOOP-1168 and its sub tickets

The discussion also happened on the SQOOP-1168 and SQOOP-1804 and SQOOP-2025




Provide a way for sqoop jobs to read and write a subset of records i.e transfer only new records added since the last transfer in case of immutable data sources and/or  transfer the delta modifications across the entire data set in case of mutable data sources 


  • Connector : Sqoop entity that is responsible for the extracting/ loading data from/to the data source it represents
  • FROM - The connector part responsible for extracting/ reading data from the corresponding data source
  • TO - The connector part responsible for loading/ writing data to the corresponding data source
  • Incremental Update :  add new records only to the TO
  • Delta Update : overwrite the old record with new value in the TO
  • DFM : Delta Fetch Merge
  • DF : Delta Fetch
  • DM : Delta Merge
  • DE: Delta Extract
  • DL : Delta Load

Design Goals

  • Support both delta read and write in a sqoop job. 

  • Provide the generic hooks  for sending input to the connectors and collecting output from the connectors and storing them in the repository. These hooks should be possibly agnostic of execution engine and the internal implementation of sqoop.

  • User facing constructs for creating DFM  jobs should be intuitive.

  • Provide a extensible API ( via configs) to support simple and complex parameters for DFM

  • POC of DFM JDBC Connector

  • POC of DFM in HDFS or HBase. Delta updates should provide a optimal merge path to reconcile old and new values especially in data sources such as HDFS if possible.

  • Expose the relevant metrics/state information of a DFM in the repository via the rest API


  •  Support DFM in very connector we support

Design Features Revision 2 

UPDATEThe discussion happened on the SQOOP-1168 and SQOOP-1804 and SQOOP-2025 JIRA items. The design doc will soon be updated with the final details.

NOTE : We will use the DFM  term to broadly mean reading sub set of records and write them either my appending or reconciling/ overwriting with existing values/ deleting. It can mean any of the above or all as the connector implementation defines it.

Connector API extensions

  • Connector will tell if they support delta fetch / merge for FROM/TO separately in the initializer, it will help with validations in job creation. Something along the following lines

    // in the FromInitializer
    public abstract class Initializer<LinkConfiguration, JobConfiguration> {
     boolean doesSupportDeltaExtract() {
    // in the ToInitializer
    public abstract class Initializer<LinkConfiguration, JobConfiguration> {
     boolean doesSupportDeltaLoad() {
  • Configuration classes and config objects and their validations and annotations will be reused for specifying configs for delta fetch and merge
    • A connector can have one or more of this in the FromJobConfiguration for read/ extract
    • A connector can have one or more of this in the ToJobConfiguration for write/ load, for instance it can have a config for appending, and another config for overwrites and ask inputs for what it should do while merging. We can have some base classes provided in sdk that have nothing custom going on for these configs. Example of a config in the FromJobConfiguration and ToJobConfiguration

      // In FromJobConfiguration
      @ConfigClass(validators = { @Validator(DeltaFetchConfig1.DeltaFetchConfig1Validator.class)})
      public class DeltaFetchConfig1 {
        @Input(size = 255) public String column;
        // validate supported oeprators, can provide a default value etc...
        @Input public String operator;
        @Input public String value;
      @ConfigClass(validators = { @Validator(DeltaFetchConfig2.DeltaFetchConfig2Validator.class)})
      public class DeltaFetchConfig2 {
        @Input(size = 255) public String deltaFetchQuery;
      // In ToJobConfiguration
      @ConfigClass(validators = { @Validator(IncrementalAppendConfig3.IncrementalAppendConfig3Validator.class)})
      public class IncrementalAppendConfig3 {
        @Input public boolean tempDatasetForMerge;
    • There are no hard rules, a connector has full independence to define its sets of parameters to do delta fetch and merges, name them as they see fit, including the name of the config classes
    • They will have to provide a list of supported DFM configs for ease of implementation, else we can do some magic to infer these with an annotation or a base class that they all extend. If they provide more than one configs in the FromJobConfiguration itself we an avoid this extra method of getSupportedDeltaConfigs.  We may need some conditions on config annotation to say, if IncrementalAppendConfig is shown to the use, do not show the IncrementalMergeConfig - https://issues.apache.org/jira/browse/SQOOP-1919

User Facing changes

For Command Line


Here are some high level steps on how the user interaction will look like when creating a job for connectors that support DFM

  1. User does create job -f 1 -t 2, say -f 1 support a deltaFetch config 
  2. Sqoop then as ususal presents the configs for the -f and -t connectors NOTE: Sqoop might do a validation here, to see if both connectors support delta and then only present the delta configs, else continue as a normal full load job
  3. User fills in the values, if they dont fill it in, then we know they dont want a delta fetch/ merge
  4. Sqoop returns a job id, say 1
  5. User then can start the job, start job -j 1
  6. Sqoop then invokes the sqoop job lifecycle, connectors gets all the configs values in the Extractor API for FROM and Loader API for the TO part, they do what it needs to do to extract and load respectively.
  7. Once they are done extracting/loading, they can tell additional info/connector_job_output that they want to persist to sqoop ( in the repository), it can be more than just last-value or number of rows read or number of rows written, boolean to say this was DFM, it is upto the connector to do that, we can persist a object or some defined key values or a BLOB to KISS, again implementation details
  8. User can in the meantime, do status job -j 1 and they will get the progress of the job in the submission record, all these supplemental information can be exposed in submission, to say what happened in DFM
  9. Since the configs are dynamically chosen, we can expose what the resulting submission config values were in the submission API ( show submission) or add a new API that will show all the config and output values per job ID - show joboutput or something along the lines  



  • Create job API will show the DF and DM configs, it will apply the same validations as in command line
  • JSON for connector will have more config values to display inside the from-job-config and to-job-config
  • JSON for jobs will also show the config inputs that the user created when they created the job
  • JSON for submission will show the job output in addition to the details it already shows


Storing FROM/TO connector state 

How to get the input from sqoop into connector?

  • The FromJobConfiguration and ToJobConfiguration will expose the config objects in the JOB to the connectors. Connectors have to be aware of the configs they expose. In case of more than one DF or DM config they may have to check which exists like a switch case and then use the one that exists. Sqoop will be smart to insert only one config that the user populates. For this we need the conditions on the config annotation as stated above:  https://issues.apache.org/jira/browse/SQOOP-1919
  • In addition to the configs, we will insert the info such as the last_value that the connector needs from previous run ( not limited to last value alone ) in the ExtractorContext and LoaderContext, we may have one field as EXTRACTOR_JOB_INFO that will hold all the dat from previous run. Again, how do we get these values from repo to the Extractor context, nothing fancy, either use the distributed cache or some similar way to write the output back on the context in case of MR. Spark has other ways to get data into the spark job.

How to get output from connector to sqoop ?

  • We will use another field in the ExtractorContext and LoaderContext to store the output back, inspect it on the way back and set it in the credential store/ distributed cache. JobManager will then write this info to the repository. We can be generic and support a BLOB or even an avro or JSON object as the output leaving it flexible on how a connector writes its output back. If we want types we can choose avro, else we can choose JSON

Where to store the output in sqoop?

We can store the values from the Extractor context and Loader Context, that are passed to sqoop in some ways

  1. SQ_JOB_INPUT ( with a new  MInputType, INTERNAL or OUTPUT, or other relevant name to say that this is not what connectors exposed, but resulted from JOB execution for FROM and TO), the real advantage is that we have separation per CONFIG per DIRECTION. It is less effort, no new table, but it sounds a bit odd to store the output in the input table
  2. We can dump everything in SQ_COUNTER_SUBMISSION related to submission table, but what we may want to give back may be not always counter or long value, it can be a boolean saying if it was a delta fetch or merge , it wont have the distinction per direction. If we are to go this route, the SQ_COUNTER_SUBMISSION needs to generalize to support any form of job output values to be stored in it and exposed via submission API. A simiple way to think about it is to mirror what we have in MInputType for MOutputType
  3. Store the output in a BLOB inside SQ_SUBMISSION itself. Cons include we need to store this as JSON and allowing to edit one field in JSON might need some extra work, still doable. The other major con, is if we ever itend to migrate away from a single BLOB to a more typed way to store output in the DB, such as a INTEGER, FLOAT, BOOLEAN, like in the SQ_JOB_INPUT, we need to do the extra work of data migration and since we may not have types in the JSON, it might be really hard. We can mandate the output to be in avro to have the types preserved via schema.
  4. Possibly mirror the option #1 and create SQ_JOB_OUTPUT table, to store the key/values and its types. Not too much effort, cleaner and easy to expose a edit API for any field in the output.

Choosing option 4 at this point since it has more pros than the other 3 options. 



Design Features Revision 1

(NOTE : The following was the revision 1 of the proposed design, feel free to ignore it if you have read revision 2!)

  • User facing feature: Support a way to express DFM in sqoop via command-line and the Rest API. For instance the user should be able to indicate/say 
    • create job -j 1 with predicate foo, where foo encapsulates the information for reading and writing subset or all records from the data source
  • Connector API extension : Connector should be able to have a way to say if they support DFM in FROM and TO directions. It should be possible for a connector to only support DFM in FROM or TO or neither. Default will be false, hence very connector has to make a conscious decision to support this feature or not.
  • Predicate API /Incremental Config class for connectors ( with validation)  : First, provide a generic Predicate API to express the conditions for read/write subset/all records from its source. As simple as it gets a predicate implementation is list of attributes similar to a LinkConfig or FromJobConfig.
    • A connector will provide a predicate to express how to filter its data for DFM. In case of JDBC sources it can be a JDBCPredicate/ JDBCIncrementalConfig that has attributes for condition in the where clause - ( SELECT * from table where id > = 24) . A MongoDbPredicate or HDFSPredicate can provide custom predicate classes that the job will be presented with when they choose MongoDbConnector or HDFSConnector for FROM and TO. 
    • Provide a simple predicate implementation that the connectors can extend and define their supported predicate types. Each Predicate should also provide validations for the attributes it supports. For instance in the case of JDBCPredicate, it should validate the column name id, supported operators such as > ,< or any free form query it supports, and the value that the user provides in the command-line or Rest API. 
    • Third, Predicate is encapsulated in a IncrementalConfig object. Incremental/Delta Config will be part of FromConfig and ToConfig for the connectors. There are few ways to implement on how the Incremental/Delta Config  will be injected to the FromJobConfiguration and ToJobConfiguration in the connectors.
  • Repository API extension: Incremental/Delta Update config inputs ( SQ_JOB_INPUT) to store the state for FROM/TO directions per sqoop job. For instance the column name, the given value and the actual value post the incremental read and write have completed, the number of records updated per run etc.
  • Rest API extension: Top level GET Config API to get the incremental config information such as the last-value that was successfully updated or the range of values that were updated in the last incremental/delta run

    Additional Features ( nice to haves that will provide mode visibility into the internals of the incremental/delta updates)
  • Able to group jobs across multiple incremental/ delta update runs, needs repository structure change to have a groupId across all incrementals/ delta updates per job
  • Metrics that expose the health of incremental reads and writes status ( both application and system level ), to begin with just log both the application and system level metrics ( such as time taken to process the incremental)

Sqoop Components Design Details 

Command Line feature extensions


  • changes to the create job command if any
  • present the right configs per connector, if config is filled then it will be treated as a delta update job
  • ensure the corresponding config validations are invoked - SQOOP-1801

REST API feature extensions

  • add a top level GET api for configs to get FROM an TO job config per jobId, so we can see the FROM. TO, DRIVER and DELTA configs granularly
  • changes to the create job REST API to support to post additional configs for delta

Repository API extensions


  • Provide a api to get the delta FROM/TO state per job 

Repository Structure changes


  • Might need to extend the counters to support any form of job output, or just add a new table to store the output ( per type ) 

Connector API extensions

JIRA : SQOOP-1799 - support a way for connector to express if it supports DFM in FROM and TO, also add rows written to the extractor API

No other changes foreseen at this point, except that the SQOOP-1603 might propose a new step in connector API for merge.

JobManager/Execution Engine support for Delta Fetch


  • Changes to the FromJobConfiguration to expose the DF Config
  • Changes to support additional attributes in the ExtractorContext to record the statistics of Incremental/Delta Read

JobManager/Execution Engine support for Delta Merge


  • Changes to the FromJobConfiguration to expose the DM Config
  • Changes to support additional attributes in the ExtractorContext to record the statistics of DF



  • Changes the JDBC Connector to support DF
  • Changes to the Extractor to read incremental config in the FromJobConfiguration
  • Changes the JDBC Connector to support  DM
  • Changes to the Loader to read incremental config in the ToJobConfiguration
  • Changes in Extractor and Loader to populate the Extractor /Loader context with relevant information post the update

DFM POC in Kite

JIRA : SQOOP-1744 ( A separate design doc will be created for this feature )

Other details 

( lack of better words for this section)

New External Jar Dependencies added?


What are the Connectors modified?

  • GenericJDBC From and To: new config objects will be added, no upgrade code required
  • Kite To: new config objects will be added, no upgrade code required

Is the Repository structure modified?

Could be, see the storage options discussion above in the design features revision 2

New sqoop package/ component proposed?


Backwards Compat & Upgrade

  • The configs upgrade will follow the existing solution for connectors.
  • Repository changes if done will have the repo upgrade code.
  • APIs have addition of methods, existing methods have not been modified.

Testing strategy

Unit tests for the internal sqoop and API changes and Integration tests for POC connectors

Performance and Scale 

The current sqoop2  does not have any specified perf benchmarks, the current doc will fallback on existing infra if any

Misc: Open Questions 

Following were the open questions when the design revision 1 was proposed

  • Can we merge the incremental/ delta update concept together and expose a consolidated construct the the user does not have to distinguish?  Based on the config values can be determine if it is incremental or delta ? Provide one consolidated terminology to the user will add to less confusion. Yes (Closed)

  • Does the predicate be expressed as a JSON object in the command line as well? For example the rest API might post JSON for a JDBC predicate . We dont need the following, see Take 2 example(Closed)

    "predicate": { 
      "type" : "jdbc"
      "column" : "id"
      "operator" : ">="
      "value" : "34"
  • Should we store the FROM and TO state in the SQ_JOB_INPUT or in the SQ_JOB_SUBMISSION ? Rather prefer to have them stored as configs and inputs, this avoids any changes to repository schema/ tables we have now.

Misc: Sqoop 1 Details

Sqoop 1 was less generic than Sqoop2 in terms of the FROM and TO. The From was a SQL and TO was HDFS/ Hive...and this is no longer true in Sqoop2. It cab be from HDFS to SQL or from MongoDB to MySQL.

Relevant code in Sqoop 1. 


  • No labels