You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

This section helps you set up quick-start jobs for ingesting data from HDFS to Kafka topic. We currently do not support the ability to write from HDFS to multiple Kafka topics. Also, we do not support partitioning by keys when writing to Kafka. We do allow topics with multiple partitions. However, in this case, the data will be distributed across partitions in a round robin manner. 

We will illustrate both the Standalone and Mapreduce modes of operation.

Standalone

This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).

  • Suppose the wikipedia data is written to:

    /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
  • Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is localhost:9092.

  • Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
  • Create a job config with the following properties:

job.name=GobblinHdfsQuickStart
job.group=GobblinHdfs
job.description=Gobblin quick start job for Hdfs
job.lock.enabled=false
 
fs.uri=hdfs://localhost:9000
 
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
extract.namespace=org.apache.gobblin.source.extractor.hadoop
extract.table.type=SNAPSHOT_ONLY
 
source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
 
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
 
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
 
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
 
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
 
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
task.data.root.dir=/tmp 

 


Mapreduce

This examples runs Gobblin in MapReduce mode. It reads files from HDFS using the HadoopTextFileSource implementation in gobblin-example and writes data to a single partition Kafka topic called MRTest. 

  • Set up a single node Kafka broker as in the standalone mode
  • Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster. 
  • Create a job config with the following properties:

    job.name=GobblinHdfsMRQuickStart

    job.group=GobblinHdfsMR

    job.description=Gobblin quick start job for Hdfs

    job.lock.enabled=false

     

    launcher.type=MAPREDUCE

     

    fs.uri=hdfs://localhost:9000

    source.class=org.apache.gobblin.example.hadoop.HadoopTextFileSource

    extract.namespace=org.apache.gobblin.example.hadoop

    extract.table.name=test

    extract.table.type=APPEND_ONLY

     

    writer.fs.uri=hdfs://localhost:9000

    state.store.fs.uri=hdfs://localhost:9000

     

    source.hadoop.file.input.format.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

    source.hadoop.file.splits.desired=1

    source.hadoop.file.input.paths=hdfs://localhost:9000/data/test

     

    converter.classes=org.apache.gobblin.converter.string.ObjectToStringConverter

     

    writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder

    writer.kafka.topic=MRTest

    writer.kafka.producerConfig.bootstrap.servers=localhost:9092

    writer.kafka.producerConfig.value.serializer=org.apache.kafka.common.serialization.StringSerializer

     

    data.publisher.type=org.apache.gobblin.publisher.NoopPublisher

     

    mr.job.max.mappers=1

     

    metrics.reporting.file.enabled=true

    metrics.log.dir=/tmp/suvasude/metrics

    metrics.reporting.file.suffix=txt

     

    mr.job.root.dir=/gobblin-kafka/working

    state.store.dir=/gobblin-kafka/state-store

    task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data

     

     

  • Run gobblin-mapreduce.sh:

    • bin/gobblin-mapreduce.sh --conf <path-to-job-config-file>

 

 

  • No labels