Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).

  • Suppose the wikipedia data is written to:

    /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
  • Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is localhost:9092.

  • Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
  • Create a job config with the following properties:

job.name=
GobblinHdfsQuickStart
GobblinHdfsToKafkaQuickStart
job.group=
GobblinHdfs
GobblinHdfsToKafka
job.description=Gobblin quick start job for Hdfs to Kafka ingestion
job.lock.enabled=false
 
fs.uri=hdfs://localhost:9000
 
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
extract.namespace=org.apache.gobblin.source.extractor.hadoop
extract.table.type=SNAPSHOT_ONLY
 
source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
 
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
 
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
 
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
 
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
 
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
task.data.root.dir=/tmp 

 

Finally, upon successful execution, you should see the following lines in gobblin logs:

INFO  [Task-committing-pool-0] org.apache.gobblin.runtime.fork.Fork  345 - Committing data for fork 0 of task <task_id>
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  441 - Commit called, will wait for commitTimeout : 60000 ms
INFO  [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher  48 - All components finished successfully, checking quality tests
INFO  [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher  50 - All required test passed for this task passed.
INFO  [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher  52 - Cleanup for task publisher executed successfully.
INFO  [Task-committing-pool-1] org.apache.gobblin.runtime.fork.Fork  345 - Committing data for fork 0 of task <task_id>
INFO  [Task-committing-pool-1] org.apache.gobblin.writer.AsyncWriterManager  441 - Commit called, will wait for commitTimeout : 60000 ms
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  482 - Successfully committed 2 records.
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  424 - Close called
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  430 - Successfully done closing
INFO  [Task-committing-pool-1] org.apache.gobblin.writer.AsyncWriterManager  482 - Successfully committed 10 records.
  • You can also verify the output is written to kafka topic WikipediaExample by consuming from the topic.

 

Mapreduce

We next run the same example as above in MapReduce mode. We write data to a new Kafka topic WikipediaExample1. 

  • Set up a single node Kafka broker as in the standalone mode
  • Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster. 
  • Create a job config with the following properties:

    job.name=GobblinHdfsToKafkaQuickStart
    job.group=GobblinHdfsToKafka
    job.description=Gobblin quick start job for Hdfs to Kafka ingestion
    job.lock.enabled=false
     
    launcher.type=MAPREDUCE
    fs.uri=hdfs://localhost:9000
     
    source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
    extract.namespace=org.apache.gobblin.source.extractor.hadoop
    extract.tablename=Wikipedia
    extract.table.type=SNAPSHOT_ONLY
     
    source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
    source.filebased.fs.uri=hdfs://localhost:9000
     
    writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
    writer.kafka.topic=

...

  • WikipediaExample1
    writer.kafka.producerConfig.bootstrap.servers=localhost:9092
     
    ##Confluent Schema Registry and serializers
    #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
     
    #Use Local Schema Registry and serializers
    writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
    writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
    writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
    writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
    ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
    pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
    pe": ["string", "null"]}]}
     
    data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
     
    metrics.reporting.file.enabled=true
    metrics.log.dir=/tmp/suvasude/metrics
    metrics.reporting.file.suffix=txt
     

...

  • mr.job.

...

  • root.

...

  • dir=/gobblin-kafka/working
    task.data.root.dir=/tmp
  • Run gobblin-mapreduce.sh:

    • bin/gobblin-mapreduce.sh --jars lib/reactive-streams-1.0.0.jar,lib/gobblin-kafka-08-<GOBBLIN-VERSION>.jar,lib/gobblin-kafka-common-<GOBBLIN-VERSION>.jar --conf ~/gobblin/conf/ex4.pull --workdir /tmp. Replace <GOBBLIN-VERSION> in the above command with the actual version set during the distribution build.

  • You should see the following in the gobblin logs:
    • INFO  [main] org.apache.hadoop.yarn.client.RMProxy  92 - Connecting to ResourceManager at /0.0.0.0:8032
      INFO  [main] org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat  92 - Found 2 input files at <HDFS_path> 
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  396 - number of splits:2
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  479 - Submitting tokens for job: <job_id>
      INFO  [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl  166 - Submitted application <application_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1289 - The url to track the job: <URL>
      INFO  [main] org.apache.gobblin.runtime.mapreduce.MRJobLauncher  249 - Waiting for Hadoop MR job <job_id> to complete
      INFO  [main] org.apache.hadoop.mapreduce.Job  1334 - Running job: <job_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1355 - Job <job_id> running in uber mode : false
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 0% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 50% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 100% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1373 - Job <job_id> completed successfully
  • As before, the data can be verified by consuming from the output kafka topic WikipediaExample1.