Design Proposal of Kite Connector

Background

Kite SDK is an open source set of libraries for building data-oriented systems and applications. With the Kite dataset API, you can perform tasks such as reading a dataset, defining and reading views of a dataset and using MapReduce to process a dataset.

Recently Sqoop 1 has supported Parquet file format for HDFS/Hive using Kite SDK (SQOOP-1366). The JIRA (SQOOP-1529) proposes to create a Kite dataset connector for Sqoop 2, which is able to access HDFS and Hive dataset. The behavior is expected similar to what Kite CLI does.

Requirements

  1. Ability to write to a new HDFS/Hive dataset by choosing Kite connector in diverse file storage formats (Avro, Parquet and experimentally CSV) and compression codecs (Uncompressed, Snappy, Deflate, etc.).
  2. Ability to read an entire HDFS/Hive dataset by choosing Kite connector.
  3. Ability to indicate the partition strategy.
  4. Ability to support delta writes to HDFS/Hive dataset.
  5. Ability to read partially from an HDFS/Hive dataset with constraints.

Design

  1. Config objects:
    • ToJobConfig includes arguments that Kite CLI provides for import. 
      1. Dataset uri is mandatory. 
      2. Output Storage format (Enum: Avro, Parquet or experimentally CSV) is mandatory. 
      3. Compression Codec (Enum: Default, Avro or Deflate) is optional (No JIRA yet) 
      4. Path to a JSON file which defines partition strategy is optional (No JIRA yet) 
      5. User input validation will happen in-place.
    • FromJobConfig includes arguments that Kite CLI provides for export. 
      1. Dataset uri is mandatory. 
      2. User input validation will happen in-place.
    • LinkConfig intends to store credential properties. 
      1. E.g. the host and port of namenode, the host and port of hive metastore. Imagine we build a role based access control. User is able to access particular ToJobConfig and FromJobConfig, but only admin is able to access the LinkConfig. Admin does not want user to know/change the address of namenode, so LinkConfig is the right place to put credential properties.
      2. SQOOP-1751 has some discussion about that
  2. Write data into a new dataset:
    • The job will fail, if target dataset exists.
    • Every KiteDatasetLoader will create a temporary dataset and write data into it. The name of temporary dataset is expected to be unique and new.
    • If the job is done successfully, all temporary datasets will be merged as one.
    • If the job is failed, all temporary datasets will be removed.
    • As Kite uses Avro, data records will be converted from Sqoop objects (FixedPoint, Text, etc.) to Avro objects. (See also future work #3)
  3. Read data from a dataset:
    • The job will fail, if target dataset does not exist or it is not accessible.
    • Every KiteDatasetPartition should contain partition strategy information. If it is not specified, there will be only one partition.
    • Every KiteDatasetExtractor will read data from its partition.
    • If error is occurred during reading, SqoopException will be thrown.
    • As Kite uses Avro, data records will be converted from Avro to Sqoop objects (FixedPoint, Text, etc.) (See also future work #3)
  4. Partition strategy handling:
  5. Incremental Import:
    • A ToJobConfig property "bool: AppendMode" is required.
    • If target dataset does not exist, it will fail.
    • If target dataset exists, the implementation details will check dataset metadata (e.g. schema, partition strategy) defensively.
    • It will only append records to existing dataset. If it is failed due to a duplicate, we do not handle.
    • The most implementation should follow section 2.
  6. Read data from a dataset with constraints:
    • A FromJobConfig property "str: Constraint" is required.
    • Build a view query to read data.
    • The most implementation should follow section 3.

Testing

  1. Unit testing to ensure the correctness of utils.
  2. Integration testing to ensure data can be moved from JDBC to HDFS/Hive.

Future Work

  1. As Sqoop 2 does not allow to specify InputFormat and OutputFormat, data reading can be Inefficient as we cannot create concurrent data readers, especially for a un-partitioned dataset. Still need some investigation with Kite team for a solution.
  2. HBase support (SQOOP-1744) will be an individual improvement to the original design.
  3. The current implementation uses the default IDF class (CSVIDF) for data conversion. Recently we have introduced AvroIDF. As Kite uses Avro internally, it makes sense to use AvroIDF instead of CSVIDF. This will involve two things:
    1. Clean up AvroTypeUtil and KiteDataTypeUtil.
    2. AvroIDF will be responsible to convert every Sqoop data type (FixedPoint, Text, etc.) to corresponding Avro representation.
  4. (VB) : The complex types array/ map/ enum are not supported in the current design/implementation.
  5. CSV format for HDFS-write via KiteConnector only supports "primitive types" since it is experimentally supported in Kite SDK
  6. The design details of Delta Write in Kite-HDFS is not included in this wiki, another design wiki will be added for SQOOP-1999

  • No labels

3 Comments

  1. Finally we have a doc.  

    Here are some things I am hoping to get more details on and please add these to the wiki so it is more detailed and complete since we already have the implementation in place

    • We can certainly add more details on how avro - sqoop conversion is done, why did we choose CSVIDF and are there plans to change this connector to use Avro IDF? I think the performance impact on using CSV is very high, since every records avro - text and back and forth conversion is done in both the FROM and TO part
    • Sqoop objects can mean text or object array, what is that we are using in Kite?
    • Why do we call it hdfsport? wont this be relevant to HIVE as well?
    • Current code does not handle complex types in the sqoop IDF, so add that as a limitation or if there is intention to address it, please say so explicitly if this will be addressed in a ticket? If we are keeping CSVIDF for version 1 and doing the back and forth between Avro and sqoop type, please create a table on how each of them is mapped. See AvroIDF wiki for an example on how this mapping is depicted.
    • Especially the UNION handling seems one off 
    • Please elaborate more on what type for formats the Kite Connector is supporting, a table like this will he helpful 
     TEXTAVROParquet
    HDFS FROM?? 
    HDFS TO   
    HIVE FROM   
    HIVE TO?  
    • I would provide more details on when and how are we supporting the partitioning in the Kite connector, is this going to be a config in the FromJob and ToJob ? I presume so since the SQOOP-1744 will be doing the same.
    • Whose obligation is it if it not kite connector? And also provide what is fault handling here mean, an example explaining a failure scenario in reading is good to have.
        >>>The fault handling is not an obligation of Kite connector in read mode.
    • I was hoping to see more concrete details  on how the Hive functionality will be supported in the existing code that is in the sqoop2 for KiteConnector, how will the user   know it is HIVE or HDFS when using this connector? 
    • There is some code in KiteDataTypeUtil and some code in AvroTypeUtil, can we clean this up and merge it into one KiteDataTypeUtil since CSV to Avro conversion is custom to Kite at this point. I dont think there will be a need for this in the common connector-sdk, but even there is please have combined into one.
    • What is the baseline for performance testing? is there a ticket for this if you are intending to provide results?

    I would put the Hbase support as a limitation. since it was not even part of the requirement, so It is odd to call it a limitation

    Regarding Delta Writing :

    • Again, more details in the design is welcome, what does the configs for delta read/ write look like. 
    • I only see delta writing and no delta read, does this apply to both HDFS and HIVE? 
    • What does the writing strategy look like? The description says use section 2 if dataset does not exist, so does this mean we will create a new data set? Is this expected of a delta write? or should we fail hard saying data set does not exisit
    • Feels like lot of the design details looks like requirements, should handle rather can have details on how do you propose to handle it.  
    • Good proposal to do schema validation, but I wonder why this is required, ? Would not the matcher code take of this. I am unclear betwen what is invalid schema vs making a best effort for schema matching

     

     

     

  2. One more question, UNION type in avro to sqoop type, Is this the best way to handle it? Should not be an alternative to add such a type in sqoop?

    How do we assume there is only first and second type, cant there be more than that in a Avro record?

    private static Column avroTypeToSchemaType(Schema.Field field) {
    
        Schema.Type schemaType = field.schema().getType();
    
        if (schemaType == Schema.Type.UNION) {
    
          List<Schema> unionSchema = field.schema().getTypes();
    
          if (unionSchema.size() == 2) {
    
            Schema.Type first = unionSchema.get(0).getType();
    
            Schema.Type second = unionSchema.get(1).getType();
    
            if ((first == Schema.Type.NULL && second != Schema.Type.NULL) ||
    
                (first != Schema.Type.NULL && second == Schema.Type.NULL)) {
    
              return avroPrimitiveTypeToSchemaType(field.name(),
    
                  first != Schema.Type.NULL ? first : second);
    
            }
    
          }
    
          // This is an unsupported complex data type
    
          return new Unknown(field.name());
    
        }
  3. NOTE: Kite SDK support for formats 

     

    Qian please fill in the blanks when the Hive implementation is ready. We want to to state what is the current limitation of Kite connector and what is not.

     

     Text/ CSVAvro supportedParquet suported
    HDFS- read

    Yes all types

    Yes all typesYes all types
    HDFS- write

    only primitive types are supported

    ( such as int, float, string?)

    Yes all typesYes all types
    HIVE -read??????
    HIVE -write??????