This section helps you set up quick-start jobs for ingesting data from HDFS to Kafka topic. We currently do not support the ability to write from HDFS to multiple Kafka topics. Also, we do not support partitioning by keys when writing to Kafka. We do allow topics with multiple partitions. However, in this case, the data will be distributed across partitions in a round robin manner.
Standalone
This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).
Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is
localhost:9092
.- Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
- Create a job config with the following properties:
job.name=GobblinHdfsQuickStart job.group=GobblinHdfs job.description=Gobblin quick start job for Hdfs job.lock.enabled=false
fs.uri=hdfs://localhost:9000
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource extract.namespace=org.apache.gobblin.source.extractor.hadoop extract.table.type=SNAPSHOT_ONLY
source.filebased.data.directory=/data/wikipedia/Wikipedia_Sandbox source.filebased.fs.uri=hdfs://localhost:9000
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder writer.kafka.topic=WikipediaExample writer.kafka.producerConfig.bootstrap.servers=localhost:9092
##Confluent Schema Registry and serializers #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
#Use Local Schema Registry and serializers writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\ ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\ pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\ pe": ["string", "null"]}]}
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
metrics.reporting.file.enabled=true metrics.log.dir=/tmp/suvasude/metrics metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest task.data.root.dir=/tmp |