You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Current »

Design Proposal of Kafka Connector (From side)

Background

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

 Sqoop 2 will support Kafka connector (SQOOP-1851). Currently, Kafak connector (To side) is supported (SQOOP-1852). This design doc is for Kafka connector (From side) (SQOOP-1853)

Requirements

  1. Ability to read from a Kafka server as a consumer.
  2. Support partition reading. (SQOOP-2434) As partition is supported in Kafka, this could be used in Sqoop as partition.
  3. Support incremental reading. (SQOOP-2435) The offset will be 0 at the first time. User could set the offset in job from config.
  4. Multi-topic will be an improvement.
  5. Support CSV format. Other data format (Avro/Parquet) support will be an improvement.

Design

  1. Basic function:
    • FromJobConfig includes arguments. 
      1. From topic is mandatory.
      2. Offset is optional, which is used in incremental supporting.
    • KafkaPartition includes arguments.
      1. Partition is mandatory. The Sqoop partition will split according to Kafka partition. In multi-topic supporting, topic+partition is mandatory.
  2. Write data into a new dataset:
    • The job will fail, if target dataset exists.
    • Every KiteDatasetLoader will create a temporary dataset and write data into it. The name of temporary dataset is expected to be unique and new.
    • If the job is done successfully, all temporary datasets will be merged as one.
    • If the job is failed, all temporary datasets will be removed.
    • As Kite uses Avro, data records will be converted from Sqoop objects (FixedPoint, Text, etc.) to Avro objects. (See also future work #3)
  3. Read data from a dataset:
    • The job will fail, if target dataset does not exist or it is not accessible.
    • Every KiteDatasetPartition should contain partition strategy information. If it is not specified, there will be only one partition.
    • Every KiteDatasetExtractor will read data from its partition.
    • If error is occurred during reading, SqoopException will be thrown.
    • As Kite uses Avro, data records will be converted from Avro to Sqoop objects (FixedPoint, Text, etc.) (See also future work #3)
  4. Partition strategy handling:
  5. Incremental Import:
    • A ToJobConfig property "bool: AppendMode" is required.
    • If target dataset does not exist, it will fail.
    • If target dataset exists, the implementation details will check dataset metadata (e.g. schema, partition strategy) defensively.
    • It will only append records to existing dataset. If it is failed due to a duplicate, we do not handle.
    • The most implementation should follow section 2.
  6. Read data from a dataset with constraints:
    • A FromJobConfig property "str: Constraint" is required.
    • Build a view query to read data.
    • The most implementation should follow section 3.

Testing

  1. Unit testing to ensure the correctness of utils.
  2. Integration testing to ensure data can be moved from JDBC to HDFS/Hive.

Future Work

  1. As Sqoop 2 does not allow to specify InputFormat and OutputFormat, data reading can be Inefficient as we cannot create concurrent data readers, especially for a un-partitioned dataset. Still need some investigation with Kite team for a solution.
  2. HBase support (SQOOP-1744) will be an individual improvement to the original design.
  3. The current implementation uses the default IDF class (CSVIDF) for data conversion. Recently we have introduced AvroIDF. As Kite uses Avro internally, it makes sense to use AvroIDF instead of CSVIDF. This will involve two things:
    1. Clean up AvroTypeUtil and KiteDataTypeUtil.
    2. AvroIDF will be responsible to convert every Sqoop data type (FixedPoint, Text, etc.) to corresponding Avro representation.
  4. (VB) : The complex types array/ map/ enum are not supported in the current design/implementation.
  5. CSV format for HDFS-write via KiteConnector only supports "primitive types" since it is experimentally supported in Kite SDK
  6. The design details of Delta Write in Kite-HDFS is not included in this wiki, another design wiki will be added for SQOOP-1999
  • No labels