Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Suppose the wikipedia data is written to:

    /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
  • Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is localhost:9092.

  • Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
  • Create a job config with the following properties:

job.name=GobblinHdfsQuickStartGobblinHdfsToKafkaQuickStart
job.group=GobblinHdfs
job.description=Gobblin quick start job for Hdfs to Kafka ingestion
job.lock.enabled=false
 
fs.uri=hdfs://localhost:9000
 
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
extract.namespace=org.apache.gobblin.source.extractor.hadoop
extract.table.type=SNAPSHOT_ONLY
 
source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
 
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
 
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
 
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
 
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
 
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
task.data.root.dir=/tmp 

...

  • You can also verify the output is written to kafka topic WikipediaExample by consuming from the topic.

 

Mapreduce

This examples runs Gobblin We next run the same example as above in MapReduce mode. It reads files from HDFS using the HadoopTextFileSource implementation in gobblin-example and writes We write data to a single partition new Kafka topic called MRTestWikipediaExample1

  • Set up a single node Kafka broker as in the standalone mode
  • Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster. 
  • Create a job config with the following properties:

    job
    job.name=
    GobblinHdfsMRQuickStart
    GobblinHdfsToKafkaQuickStart
    job.group=
    GobblinHdfsMR
    GobblinHdfs
    job.description=Gobblin quick start job for Hdfs to Kafka ingestion
    job.lock.enabled=false
     
    launcher.type=MAPREDUCE

     

    fs.uri=hdfs://localhost:9000
     
    source.class=org.apache.gobblin.source.
    example
    extractor.hadoop.
    HadoopTextFileSource
    AvroFileSource
    extract.namespace=org.apache.gobblin.
    example
    source.extractor.hadoop
    extract.
    table.name
    tablename=
    test
    Wikipedia
    extract.table.type=
    APPEND
    SNAPSHOT_ONLY
     
    writer
    source.filebased.
    fs
    data.
    uri=hdfs://localhost:9000state.store
    directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
    source.filebased.fs.uri=hdfs://localhost:9000
     

    source.hadoop.file.input.format.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat

    source.hadoop.file.splits.desired=1

    source.hadoop.file.input.paths=hdfs://localhost:9000/data/test

     

    converter.classes
    writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
    writer.kafka.topic=WikipediaExample1
    writer.kafka.producerConfig.bootstrap.servers=localhost:9092
     
    ##Confluent Schema Registry and serializers
    #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
     
    #Use Local Schema Registry and serializers
    writer.kafka.producerConfig.value.serializer=org.apache.gobblin.
    converter
    kafka.
    string
    serialize.
    ObjectToStringConverter
    LiAvroSerializer

     

    writer.builder
    writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.
    writer
    schemareg.
    KafkaDataWriterBuilder
    ConfigDrivenMd5SchemaRegistry
    writer.kafka.
    topic=MRTestwriter.kafka.producerConfig.bootstrap.servers=localhost:9092
    producerConfig.schemaRegistry.schema.name=WikipediaExample
    writer.kafka.producerConfig.schemaRegistry.schema.value
    .serializer=org.apache.kafka.common.serialization.StringSerializer
    ={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
    ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
    pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
    pe": ["string", "null"]}]}
     
    data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
     

    mr.job.max.mappers=1

     

    metrics.reporting.file.enabled=true
    metrics.log.dir=/tmp/suvasude/metrics
    metrics.reporting.file.suffix=txt
     
    mr.job.root.dir=/gobblin-kafka/working
    state
    task.data.
    store
    root.dir=/
    gobblin-kafka/state-store

    task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data

     

     

    tmp
  • Run gobblin-mapreduce.sh:

    • bin/gobblin-mapreduce.sh --jars lib/reactive-streams-1.0.0.jar,lib/gobblin-kafka-08-<GOBBLIN-VERSION>.jar,lib/gobblin-kafka-common-<GOBBLIN-VERSION>.jar --conf ~/gobblin/conf/ex4.pull --workdir /tmp. Replace <GOBBLIN-VERSION> in the above command with the actual version set during the distribution build.

  • You should see the following in the gobblin logs:
    • INFO  [main] org.apache.hadoop.yarn.client.RMProxy  92 - Connecting to ResourceManager at /0.0.0.0:8032
      INFO  [main] org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat  92 - Found 2 input files at <HDFS_path> 
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  396 - number of splits:2
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  479 - Submitting tokens for job: <job_id>
      INFO  [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl  166 - Submitted application <application_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1289 - The url to track the job: <URL>
      INFO  [main] org.apache.gobblin.runtime.mapreduce.MRJobLauncher  249 - Waiting for Hadoop MR job <job_id> to complete
      INFO  [main] org.apache.hadoop.mapreduce.Job  1334 - Running job: <job_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1355 - Job <job_id> running in uber mode : false
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 0% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 50% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 100% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1373 - Job <job_id> completed successfully

       

       

    Run gobblin-mapreduce.sh:

    • bin/gobblin-mapreduce.sh --conf <path-to-job-config-file>