You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 23 Current »

 

JIRA : SQOOP-1938

This document provides details of how the Sqoop MR Execution Engine works, its major components and details about the internals of the the implementation

Summary

The main entry point in Sqoop2 for job execution is the JobManager that is part of the package org.apache.sqoop.driver. JobManager holds the handle to the SubmissionEngine and the ExecutionEngine.

Submission Engine will use the concrete apis of either YARN/ Mesos/ Oozie  that handle the resource management for job execution and submit the job to the execution engine. ExecutionEngine is the actual job executor that will use the apis of  Apache Hadoop MR or Apache Spark to execute the sqoop job

JobManager does the following 3 things

  1. Prepare the JobRequest object for the ExecutionEngine
  2. Submits the job via the SubmissionEngine  submit API, waits for the submission engine to return
  3. Based on the result of the submit API, creates and saves the Job submission record into its repository( Derby/Postgres, depending on the configured store ) to store the history across multiple job runs

Here we discuss the details of the implementation of the MRSubmissionEngine and MRExecutionEngine

MR SubmissionEngine

  • Has a handle to the concrete execution engine which is the org.apache.hadoop.mapred.JobClient in our case
  • Initialize API to set up the submission engine 
  • Submit API is blocking if using the hadoopLocalRunner and returns a boolean for success or failure of submission and async if non-local. In case of async, the update API is used subsequently to track the progress of the job submission
  • Update API can be invoked to query the status of the running job and update the Job submission record that holds the history information of a sqoop job across multiple runs
  • Stop API to abort a running job
  • Destroy API to mirror the initialize to clean up the submission engine on exit

MR ExecutionEngine

  • Has a handle to the JobRequest object populated by the  JobManager 
  • PrepareJob API to set up the necessary information required by the org.apache.hadoop.mapred.JobClient in our case 

NOTE : The ExecutionEngine api is very bares bones and most of the functionality of job execution/ failure/ exception handling resulting from the MR engine happens inside the MRSubmissionEngine 

Components of Sqoop using MR

We want to read records from FROM and write them to TO in Sqoop, We want to do this in a parallel way, so we use the MR engine. We spawn numExtractors ( a job config )  indicated  map tasks and numLoaders ( a job config ) indicated reduce tasks. So this way we can read records/ messages/ rows in parallel and write them in parallel. 

 By default sqoop job is a map only job. It does not utilize the reducer by default, unless 

# Extractors# LoadersOutcome
DefaultDefaultMap only job with 10 map tasks
Number XDefaultMap only job with X map tasks
Number XNumber YMap-reduce job with X map tasks and Y reduce tasks
DefaultNumber YMap-reduce job with 10 map tasks and Y reduce tasks

The purpose have been to provide ability to user to throttle both number of loader and extractors in an independent way (e.g. have different number of loaders then extractors) and to have default values that won't run reduce phase if not necessary.

Passing data into the sqoop job ( via the mapper)

 There is various information such as the job configs, driver configs, schema of the data read and the schema of the data written required by the Extractor and Loader that has to be passed via the SqoopMapper. It is currently passed securely like this via the credential store or via the configuration

org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes())
org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
org.apache.hadoop.mapreduce.job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName());

SqoopMapper

  1. Creates the ExtractorContext from the data stored in the configuration and credential store to pass to the connectors extract API
  2. Creates the SqoopSplit that holds the partition information for the data to be extracted
  3. Post extract call, records the Hadoop counters related to Extraction logic
  4. Passing data out of Mapper : DistributedCache can be used if we need to write any information from the extractor back to the sqoop repository

Sqoop Writable(Comparable)

  1. Having a Writable class is required by Hadoop framework - we are using the current one as a wrapper for IntermediateDataFormat. Read more on IDF here
  2. We're not using a concrete implementation such as Text, so that we don't have to convert all records to String to transfer data between mappers and reducers.
  3. SqoopWritable delegates a lot of its functionality to the IntermediateDataFormat implementation used in the sqoop job, for instance the compareTo method on the Writable can used any custom logic of the underlying IDF for sorting records Extracted and then eventually used to write in the Load phase

SqoopSplit

  1. An InputSplit describes a unit of work that comprises a single map task in a MapReduce program, SqoopSplit extends InputSplit
  2. Instantiates a custom Partition class to be used for splitting the input, in our case it is the data Extracted in the extract phase
  3. Delegates to the Partition object to read and write data
  4. It is the Key to the SqoopInputFormat

 

SqoopInputFormat

InputFormat defines: how the data in FROM are split up and read. Provides a factory for RecordReader objects that read the file

SqoopInputFormat is a custom implementation of the MR InputFormat class.  SqoopRecordReader is the custom implementation of the RecordReader.  The InputSplit has defined a slice of work, but does not describe how to access it. The SqoopRecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper.  The SqoopRecordReader is invoked repeatedly on the input until the entire SqoopSplit has been consumed. Each invocation of the SqoopRecordReader leads to another call to the run() method of the Mapper.

public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
...} 

SqoopNullOutputFormat

The (key, value) pairs provided by the mapper are passed on the Loader for the TO part. The way they are written is governed by the OutputFormat. SqoopNullOutputFormat extends the OutputFormat class. The goal of hadoop's NullOutputFormat : generates no output files  on HDFS since HDFS may not always be the destination. In our case too HDFS is not always the destination, so we use SqoopNullOutputFormat, a custom class to to delegate writing to the Loader specified in the sqoop job, it relies on the SqoopOutputFormatLoadExecutor to pass the data to the Loader via the SqoopRecordWriter. Much like how the SqoopInputFormat actually reads individual records through the SqoopRecordReader implementation, the SqoopNullOutputFormat class is a factory for SqoopRecordWriter objects; these are used to write the individual records to the final destination ( in our case the TO part of the sqoop job). Notice the key to the SqoopNullOutputFormat is actually the 

SqoopWritable,that the SqoopRecordWriter uses

 

public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
...}
 
 private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
    @Override
    public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
      free.acquire();
      checkIfConsumerThrew();
      // NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
      toDataFormat.setCSVTextData(key.toString());
      filled.release();
    }

SqoopDestroyerOutputCommitter is a custom outputcommiter that provides hooks to do final cleanup or in some cases the one-time operations we want to invoke when sqoop job finishes, i,e either fails or succeeds.

SqoopReducer 

Extends the Reducer API and at this point only runs the progressService. It is invoked only when the numLoaders driver config is > 0. It primary use case is throttling.

public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
..
      progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
} 


Why do we have ability to run reduce phase and why it’s part of throttling?

 

The original idea was that you want to throttle “From” and “To” side independently. For example if I’m exporting data from HBase to relational database, I might want to have one extractor (=mapper) per HBase region - but number of regions very likely will be more then number of pumping transactions that I want to have on my database, so I might want to specify a different number of loaders to throttle that down. But having reduce phase means to serialize all data and transfer them across network, so we are not running reduce phase unless user explicitly sets different number of loaders then reducers.

 

SqoopOutputFormatLoadExecutor and SqoopOutputFormatDataReader 

  1. The LoaderContext is set up in the ConsumerThread.run(..) method. 
  2. Loader's load method is invoked passing the SqoopOutputFormatDataReader and the LoaderContext
  3. The load method invokes the SqoopOutputFormatDataReader to read to records from the SqoopRecordWriter associated with the SqoopNullOutputFormat
  4. The SqoopOutputFormatLoadExecutor uses ConsumerThread to parallelize the extraction and loading process in addition to the parallelizing the extract only part using the numExtractors configured. More details are explained in the SQOOP-1938
     

    TL;DR: Parallelize reads and writes rather than have them be sequential.

    Most of the threading magic is for a pretty simple reason - each mapper does I/O in 2 places - one is writes to HDFS, the other is read from the DB (at that time, extend it to the new from/to architecture, you'd still have 2 I/O). By having a linear read-write code, you are essentially not reading anything while the write is happening, which seems like a pretty inefficient thing to do - you could easily read while the write is happening by parallelizing the reads and writes, which is what is being done. In addition, there is also some additional processing/handling that the output format does, which can cost time and CPU - at which point you could rather read from the DB.


 

Few related tickets proposed for enhancement 

https://issues.apache.org/jira/browse/SQOOP-1601

https://issues.apache.org/jira/browse/SQOOP-1603

 

  • No labels