Design Proposal of Kafka Connector (From side)
Background
Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Sqoop 2 will support Kafka connector (SQOOP-1851). Currently, Kafak connector (To side) is supported (SQOOP-1852). This design doc is for Kafka connector (From side) (SQOOP-1853)
Requirements
- Ability to read from a Kafka server as a consumer.
- Support partition reading. (SQOOP-2434) As partition is supported in Kafka, this could be used in Sqoop as partition.
- Support incremental reading. (SQOOP-2435) The offset will be 0 at the first time. User could set the offset in job from config.
- Multi-topic will be an improvement.
- Support CSV format. Other data format (Avro/Parquet) support will be an improvement.
Design
- Basic arguments:
- Linkconfig includes arguments
- BrokerList is mandatory.
- Zookeeper is mandatory.
- FromJobConfig includes arguments.
- From topic is mandatory.
- The Sqoop partition will split according to Kafka partition.
- Linkconfig includes arguments
- Simple function of read data from Kafka server:
- Kafka SimpleConsumer will be used to read data from Kafka server, considering partition and offset support.
- It supports to partitions reading.
- It could read a message multiple times.
- Offsets should be tracked when consuming is finished.
- Lead Broker for a topic and partition should be figured out.
- Some work should be handled when leader Broker changes.
- Several jars will be added in initialize steps.
- Some clean work may be added in destroy steps.
- Kafka SimpleConsumer will be used to read data from Kafka server, considering partition and offset support.
- Partition strategy handling:
- For simple function, there is only 1 partition for topic. No mutli-thread is supported.
- For partition feature, the partitions will be created according to the partitions of specific topic in Kafka.
- KafkaPartition class includes arguments:
- topic is mandatory.
- Partition logic will be implemented in function getPartitions in KafkaPartitioner class.
- get partitions of Kafka server according to topic.
- add partitions to KafkaPartitions of Sqoop Kafka import job.
- Incremental Import:
- A FromJobConfig argument "incremental mode (ALL/INCREMENTAL)" is required.
- ALL: Import all data from the beginning. Get all data from Kafka server.
- INCREMENTAL: Get offset from the last submission, and import incremental data from Kafka server.
- Offset of last import data should be stored in Sqoop server.
- At the end of exact function in KafkaExtractor class, the logic of last offset should be handled.
- Data format of offset should be List<Partition_Topic, int> offsetList.
- Partition_Topic class has arguments of Topic (String) and Partition (int). The inclusion of Topic is for multi-topic support.
- The INCREMENTAL will not effect in the first time running.
- The offset will be set to SimpleConsumer in extract function in KafkaExtractor.
- 0 will be set in ALL mode.
- Last offset will be read and set in INCREMENTAL mode.
- A FromJobConfig argument "incremental mode (ALL/INCREMENTAL)" is required.
- Patches will be separated into 3 parts:
- Initial patch
- The framework and basic import function are included.
- No partition support.
- No incremental support. Import all data from Kafka server.
- The SimpleConsumer will be used for further support.
- Make sure it is easy to support partition and incremental.
- Partition support patch
- Incremental support patch
- Initial patch
Testing
- Integration testing to ensure data can be moved from Kafka to MySQL.
Future Work
- Multi-topic support.
- The topic in FromJobConfig should be changed to topic list, separated by ",".
- The Sqoop partition should be split according to partition and topic of Kafka server.
- Other data format (Avro/Parquet) support.
- Schema should be included to support Avro/Parquet.
- The location of schema file should be mandatory in FromJobConfig.