Design Proposal of Kafka Connector (From side)


Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

 Sqoop 2 will support Kafka connector (SQOOP-1851). Currently, Kafak connector (To side) is supported (SQOOP-1852). This design doc is for Kafka connector (From side) (SQOOP-1853)


  1. Ability to read from a Kafka server as a consumer.
  2. Support partition reading. (SQOOP-2434) As partition is supported in Kafka, this could be used in Sqoop as partition.
  3. Support incremental reading. (SQOOP-2435) The offset will be 0 at the first time. User could set the offset in job from config.
  4. Multi-topic will be an improvement.
  5. Support CSV format. Other data format (Avro/Parquet) support will be an improvement.


  1. Basic arguments:
    • Linkconfig includes arguments
      1. BrokerList is mandatory.
      2. Zookeeper is mandatory.
    • FromJobConfig includes arguments.
      1. From topic is mandatory.
      2. The Sqoop partition will split according to Kafka partition.
  2. Simple function of read data from Kafka server:
    • Kafka SimpleConsumer will be used to read data from Kafka server, considering partition and offset support.
      1. It supports to partitions reading.
      2. It could read a message multiple times.
      3. Offsets should be tracked when consuming is finished.
      4. Lead Broker for a topic and partition should be figured out.
      5. Some work should be handled when leader Broker changes.
    • Several jars will be added in initialize steps.
    • Some clean work may be added in destroy steps.
  3. Partition strategy handling:
    • For simple function, there is only 1 partition for topic. No mutli-thread is supported.
    • For partition feature, the partitions will be created according to the partitions of specific topic in Kafka.
    • KafkaPartition class includes arguments:
      1. topic is mandatory.
    • Partition logic will be implemented in function getPartitions in KafkaPartitioner class.
      1. get partitions of Kafka server according to topic.
      2. add partitions to KafkaPartitions of Sqoop Kafka import job.
  4. Incremental Import:
    • A FromJobConfig argument "incremental mode (ALL/INCREMENTAL)" is required.
      1. ALL: Import all data from the beginning. Get all data from Kafka server.
      2. INCREMENTAL: Get offset from the last submission, and import incremental data from Kafka server.
    • Offset of last import data should be stored in Sqoop server.
      1. At the end of exact function in KafkaExtractor class, the logic of last offset should be handled.
      2. Data format of offset should be List<Partition_Topic, int> offsetList.
      3. Partition_Topic class has arguments of Topic (String) and Partition (int). The inclusion of Topic is for multi-topic support.
    • The INCREMENTAL will not effect in the first time running.
    • The offset will be set to SimpleConsumer in extract function in KafkaExtractor.
      1. 0 will be set in ALL mode.
      2. Last offset will be read and set in INCREMENTAL mode.
  5. Patches will be separated into 3 parts:
    • Initial patch
      1. The framework and basic import function are included.
      2. No partition support.
      3. No incremental support. Import all data from Kafka server.
      4. The SimpleConsumer will be used for further support.
      5. Make sure it is easy to support partition and incremental.
    • Partition support patch
    • Incremental support patch


  1. Integration testing to ensure data can be moved from Kafka to MySQL.

Future Work

  1. Multi-topic support.
    1. The topic in FromJobConfig should be changed to topic list, separated by ",".
    2. The Sqoop partition should be split according to partition and topic of Kafka server.
  2. Other data format (Avro/Parquet) support.
    1. Schema should be included to support Avro/Parquet.
    2. The location of schema file should be mandatory in FromJobConfig.
  • No labels