Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Ability to read from a Kafka server as a consumer.
  2. Support partition reading. (SQOOP-2434) As partition is supported in Kafka, this could be used in Sqoop as partition.
  3. Support incremental reading. (SQOOP-2435) The offset will be 0 at the first time. User could set the offset in job from config.
  4. Multi-topic will be an improvement.
  5. Support CSV format. Other data format (Avro/Parquet) support will be an improvement.

Design

  1. Basic functionarguments:
    • FromJobConfig includes Linkconfig includes arguments
      1. From topic BrokerList is mandatory.
      2. Offset is optional, which is used in incremental supporting.
      3. Zookeeper is mandatory.
    • FromJobConfig includes KafkaPartition includes arguments.
      1. Partition From topic is mandatory.
      2. The Sqoop partition will split according to Kafka partition. In multi-topic supporting, topic+ partition is mandatory.
  2. Write data into a new dataset:
    • The job will fail, if target dataset exists.
    • Every KiteDatasetLoader will create a temporary dataset and write data into it. The name of temporary dataset is expected to be unique and new.
    • If the job is done successfully, all temporary datasets will be merged as one.
    • If the job is failed, all temporary datasets will be removed.
    • As Kite uses Avro, data records will be converted from Sqoop objects (FixedPoint, Text, etc.) to Avro objects. (See also future work #3)
  3. Simple function of read data from Kafka server:
    • Kafka SimpleConsumer will be used to read data from Kafka server, considering partition and offset support.
      1. It supports to partitions reading.
      2. It could read a message multiple times.
      3. Offsets should be tracked when consuming is finished.
      4. Lead Broker for a topic and partition should be figured out.
      5. Some work should be handled when leader Broker changes.
    • Several jars will be added in initialize steps.
    • Some clean work may be added in destroy steps.
    Read data from a dataset:
    • The job will fail, if target dataset does not exist or it is not accessible.
    • Every KiteDatasetPartition should contain partition strategy information. If it is not specified, there will be only one partition.
    • Every KiteDatasetExtractor will read data from its partition.
    • If error is occurred during reading, SqoopException will be thrown.
    • As Kite uses Avro, data records will be converted from Avro to Sqoop objects (FixedPoint, Text, etc.) (See also future work #3)
  4. Partition strategy handling:
    • For writing data, if no partition strategy is specified, the dataset will be unpartitioned.
    • For reading data, if given dataset has a partition strategy, it should be used.
    • simple function, there is only 1 partition for topic. No mutli-thread is supported.
    • For partition feature, the partitions will be created according to the partitions of specific topic in Kafka.
    • KafkaPartition class includes arguments:
      1. topic is mandatory.
    • Partition logic will be implemented in function getPartitions in KafkaPartitioner class.
      1. get partitions of Kafka server according to topic.
      2. add partitions to KafkaPartitions of Sqoop Kafka import job.
      Reference: http://kitesdk.org/docs/0.17.1/Partitioned-Datasets.html
  5. Incremental Import:
    • A ToJobConfig property "bool: AppendMode" is required.
    • If target dataset does not exist, it will fail.
    • If target dataset exists, the implementation details will check dataset metadata (e.g. schema, partition strategy) defensively.
    • It will only append records to existing dataset. If it is failed due to a duplicate, we do not handle.
    • The most implementation should follow section 2.
  6. Read data from a dataset with constraints:
    • A FromJobConfig property "str: Constraint" is required.
    • Build a view query to read data.
    • The most implementation should follow section 3.

Testing

    • FromJobConfig argument "incremental mode (ALL/INCREMENTAL)" is required.
      1. ALL: Import all data from the beginning. Get all data from Kafka server.
      2. INCREMENTAL: Get offset from the last submission, and import incremental data from Kafka server.
    • Offset of last import data should be stored in Sqoop server.
      1. At the end of exact function in KafkaExtractor class, the logic of last offset should be handled.
      2. Data format of offset should be List<Partition_Topic, int> offsetList.
      3. Partition_Topic class has arguments of Topic (String) and Partition (int). The inclusion of Topic is for multi-topic support.
    • The INCREMENTAL will not effect in the first time running.
    • The offset will be set to SimpleConsumer in extract function in KafkaExtractor.
      1. 0 will be set in ALL mode.
      2. Last offset will be read and set in INCREMENTAL mode.
  1. Patches will be separated into 3 parts:
    • Initial patch
      1. The framework and basic import function are included.
      2. No partition support.
      3. No incremental support. Import all data from Kafka server.
      4. The SimpleConsumer will be used for further support.
      5. Make sure it is easy to support partition and incremental.
    • Partition support patch
    • Incremental support patch

Testing

  1. Unit testing to ensure the correctness of utils.
  2. Integration testing to ensure data can be moved from JDBC Kafka to HDFS/HiveMySQL.

Future Work

  1. Multi-topic support.
    1. The topic in FromJobConfig should be changed to topic list, separated by ",".
    2. The Sqoop partition should be split according to partition and topic of Kafka server.
  2. Other data format (Avro/Parquet) support.
    1. Schema should be included to support Avro/Parquet.
    2. The location of schema file should be mandatory in FromJobConfig.
  3. As Sqoop 2 does not allow to specify InputFormat and OutputFormat, data reading can be Inefficient as we cannot create concurrent data readers, especially for a un-partitioned dataset. Still need some investigation with Kite team for a solution.
  4. HBase support (SQOOP-1744) will be an individual improvement to the original design.
  5. The current implementation uses the default IDF class (CSVIDF) for data conversion. Recently we have introduced AvroIDF. As Kite uses Avro internally, it makes sense to use AvroIDF instead of CSVIDF. This will involve two things:
    1. Clean up AvroTypeUtil and KiteDataTypeUtil.
    2. AvroIDF will be responsible to convert every Sqoop data type (FixedPoint, Text, etc.) to corresponding Avro representation.
  6. (VB) : The complex types array/ map/ enum are not supported in the current design/implementation.
  7. CSV format for HDFS-write via KiteConnector only supports "primitive types" since it is experimentally supported in Kite SDK
  8. The design details of Delta Write in Kite-HDFS is not included in this wiki, another design wiki will be added for SQOOP-1999