Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

job.name=GobblinHdfsToKafkaQuickStart
job.group=GobblinHdfsGobblinHdfsToKafka
job.description=Gobblin quick start job for Hdfs to Kafka ingestion
job.lock.enabled=false
 
fs.uri=hdfs://localhost:9000
 
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
extract.namespace=org.apache.gobblin.source.extractor.hadoop
extract.table.type=SNAPSHOT_ONLY
 
source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
 
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
 
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
 
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
 
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
 
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
task.data.root.dir=/tmp 

...

  • Set up a single node Kafka broker as in the standalone mode
  • Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster. 
  • Create a job config with the following properties:

    job.name=GobblinHdfsToKafkaQuickStart
    job.group=GobblinHdfsGobblinHdfsToKafka
    job.description=Gobblin quick start job for Hdfs to Kafka ingestion
    job.lock.enabled=false
     
    launcher.type=MAPREDUCE
    fs.uri=hdfs://localhost:9000
     
    source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
    extract.namespace=org.apache.gobblin.source.extractor.hadoop
    extract.tablename=Wikipedia
    extract.table.type=SNAPSHOT_ONLY
     
    source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
    source.filebased.fs.uri=hdfs://localhost:9000
     
    writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
    writer.kafka.topic=WikipediaExample1
    writer.kafka.producerConfig.bootstrap.servers=localhost:9092
     
    ##Confluent Schema Registry and serializers
    #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
    #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
     
    #Use Local Schema Registry and serializers
    writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
    writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
    writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
    writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
    ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid",  "type": ["double", "null"]},{"name": "timestamp", "ty\
    pe": ["string", "null"]},{"name": "size",  "type": ["double", "null"]},{"name": "contentformat",  "type": ["string", "null"]},{"name": "contentmodel",  "type": ["string", "null"]},{"name": "content", "ty\
    pe": ["string", "null"]}]}
     
    data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
     
    metrics.reporting.file.enabled=true
    metrics.log.dir=/tmp/suvasude/metrics
    metrics.reporting.file.suffix=txt
     
    mr.job.root.dir=/gobblin-kafka/working
    task.data.root.dir=/tmp
  • Run gobblin-mapreduce.sh:

    • bin/gobblin-mapreduce.sh --jars lib/reactive-streams-1.0.0.jar,lib/gobblin-kafka-08-<GOBBLIN-VERSION>.jar,lib/gobblin-kafka-common-<GOBBLIN-VERSION>.jar --conf ~/gobblin/conf/ex4.pull --workdir /tmp. Replace <GOBBLIN-VERSION> in the above command with the actual version set during the distribution build.

  • You should see the following in the gobblin logs:
    • INFO  [main] org.apache.hadoop.yarn.client.RMProxy  92 - Connecting to ResourceManager at /0.0.0.0:8032
      INFO  [main] org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat  92 - Found 2 input files at <HDFS_path> 
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  396 - number of splits:2
      INFO  [main] org.apache.hadoop.mapreduce.JobSubmitter  479 - Submitting tokens for job: <job_id>
      INFO  [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl  166 - Submitted application <application_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1289 - The url to track the job: <URL>
      INFO  [main] org.apache.gobblin.runtime.mapreduce.MRJobLauncher  249 - Waiting for Hadoop MR job <job_id> to complete
      INFO  [main] org.apache.hadoop.mapreduce.Job  1334 - Running job: <job_id>
      INFO  [main] org.apache.hadoop.mapreduce.Job  1355 - Job <job_id> running in uber mode : false
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 0% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 50% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1362 -  map 100% reduce 0%
      INFO  [main] org.apache.hadoop.mapreduce.Job  1373 - Job <job_id> completed successfully

       

       
  • As before, the data can be verified by consuming from the output kafka topic WikipediaExample1.