Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).

  • Suppose the wikipedia data is written to:

    /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
  • Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is localhost:9092.

  • Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
  • Create a job config with the following properties:

job.name=
GobblinHdfsQuickStart
GobblinHdfsToKafkaQuickStart
job.group=
GobblinHdfs
GobblinHdfsToKafka
job.description=Gobblin quick start job for Hdfs to Kafka ingestion
job.lock.enabled=false
 
fs.uri=hdfs://localhost:9000
 
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
extract.namespace=org.apache.gobblin.source.extractor.hadoop
extract.table.type=SNAPSHOT_ONLY
 
source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/
Wikipedia_Sandbox
WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
 
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
 
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
 
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
 
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
 
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
task.data.root.dir=
/tmp

  

Mapreduce

/tmp 

 

Finally, upon successful execution, you should see the following lines in gobblin logs:

INFO  [Task-committing-pool-0] org.apache.gobblin.runtime.fork.Fork  345 - Committing data for fork 0 of task <task_id>
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  441 - Commit called, will wait for commitTimeout : 60000 ms
INFO  [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher  48 - All components finished successfully, checking quality tests
INFO  [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher  50 - All required test passed for this task passed.
INFO  [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher  52 - Cleanup for task publisher executed successfully.
INFO  [Task-committing-pool-1] org.apache.gobblin.runtime.fork.Fork  345 - Committing data for fork 0 of task <task_id>
INFO  [Task-committing-pool-1] org.apache.gobblin.writer.AsyncWriterManager  441 - Commit called, will wait for commitTimeout : 60000 ms
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  482 - Successfully committed 2 records.
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  424 - Close called
INFO  [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager  430 - Successfully done closing
INFO  [Task-committing-pool-1] org.apache.gobblin.writer.AsyncWriterManager  482 - Successfully committed 10 records.
  • You can also verify the output is written to kafka topic WikipediaExample by consuming from the topic.

 

Mapreduce

We next run the same example as above in MapReduce mode. We write data to a new Kafka topic WikipediaExample1This examples runs Gobblin in MapReduce mode. It reads files from HDFS using the HadoopTextFileSource implementation in gobblin-example and writes data to a single partition Kafka topic called MRTest

  • Set up a single node Kafka broker as in the standalone mode
  • Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster. 
  • Create a job config with the following properties:

    job
    job.name=
    GobblinHdfsMRQuickStart
    GobblinHdfsToKafkaQuickStart
    job.group=
    GobblinHdfsMR
    GobblinHdfsToKafka
    job.description=Gobblin quick start job for Hdfs to Kafka ingestion
    job.lock.enabled=false
     
    launcher.type=MAPREDUCE
     
    fs.uri=hdfs://localhost:9000
     
    source.class=org.apache.gobblin.source.
    example
    extractor.hadoop.
    HadoopTextFileSource
    AvroFileSource
    extract.namespace=org.apache.gobblin.source.
    example
    extractor.hadoop
    extract.
    table.name
    tablename=
    test
    Wikipedia
    extract.table.type=
    APPEND
    SNAPSHOT_ONLY
     
    writer
    source.filebased.
    fs
    data.
    uri=hdfs://localhost:9000state.store
    directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
    source.filebased.fs.uri=hdfs://localhost:9000
     

    source.hadoop.file.input.format.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

    source.hadoop.file.splits.desired=1

    source.hadoop.file.input.paths=hdfs://localhost:9000/data/test

     

    converter.classes
    writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
    writer.kafka.topic=WikipediaExample1
    writer.kafka.producerConfig.bootstrap.servers=localhost:9092
     
    ##Confluent Schema Registry and serializers
    #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
     
    #Use Local Schema Registry and serializers
    writer.kafka.producerConfig.value.serializer=org.apache.gobblin.
    converter
    kafka.
    string
    serialize.
    ObjectToStringConverter 
    LiAvroSerializer
    writer
    .builder
    .kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.
    writer
    schemareg.
    KafkaDataWriterBuilder
    ConfigDrivenMd5SchemaRegistry
    writer.kafka.
    topic=MRTestwriter.kafka.producerConfig.bootstrap.servers=localhost:9092
    producerConfig.schemaRegistry.schema.name=WikipediaExample
    writer.kafka.producerConfig.schemaRegistry.schema.value
    .serializer=org.apache.kafka.common.serialization.StringSerializer
    ={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
    ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
    pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
    pe": ["string", "null"]}]}
     
    data.publisher.type=org.apache.gobblin.publisher.NoopPublisher

     

    mr.job.max.mappers=1
     
    metrics.reporting.file.enabled=true
    metrics.log.dir=/tmp/suvasude/metrics
    metrics.reporting.file.suffix=txt
     
    mr.job.root.dir=/gobblin-kafka/working
    state
    task.data.
    store
    root.dir=/tmp
  • Run gobblin-mapreduce.sh:

    • bin/gobblin-mapreduce.sh --jars lib/reactive-streams-1.0.0.jar,lib/gobblin-kafka-08-<GOBBLIN-VERSION>.jar,lib/gobblin-kafka-common-<GOBBLIN-VERSION>.jar --conf ~/gobblin/conf/ex4.pull --workdir /tmp. Replace <GOBBLIN-VERSION> in the above command with the actual version set during the distribution build.

  • You should see the following in the gobblin logs:
    • INFO  [main] org.apache.hadoop.yarn.client.RMProxy  92 - Connecting to ResourceManager at /0.0.0.0:8032
      INFO  [main] org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat  92 - Found 2 input files at <HDFS_path> 
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  396 - number of splits:2
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  479 - Submitting tokens for job: <job_id>
      INFO  [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl  166 - Submitted application <application_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1289 - The url to track the job: <URL>
      INFO  [main] org.apache.gobblin.runtime.mapreduce.MRJobLauncher  249 - Waiting for Hadoop MR job <job_id> to complete
      INFO  [main] org.apache.hadoop.mapreduce.Job  1334 - Running job: <job_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1355 - Job <job_id> running in uber mode : false
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 0% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 50% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 100% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1373 - Job <job_id> completed successfully
  • As before, the data can be verified by consuming from the output kafka topic WikipediaExample1.

    -kafka/state-storetask.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data