...
This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).
Suppose the wikipedia data is written to:
/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is
localhost:9092
.- Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
- Create a job config with the following properties:
job.name=GobblinHdfsQuickStart job.group=GobblinHdfs job.description=Gobblin quick start job for Hdfs job.lock.enabled=false fs.uri=hdfs://localhost:9000 source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource extract.namespace=org.apache.gobblin.source.extractor.hadoop extract.table.type=SNAPSHOT_ONLY source.filebased.data.directory=/data/wikipedia /Wikipedia_Sandbox/org/apache/gobblin/example/wikipedia/WikipediaOutput source.filebased.fs.uri=hdfs://localhost:9000 writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder writer.kafka.topic=WikipediaExample writer.kafka.producerConfig.bootstrap.servers=localhost:9092 ##Confluent Schema Registry and serializers #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081 #Use Local Schema Registry and serializers writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\ ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\ pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\ pe": ["string", "null"]}]} data.publisher.type=org.apache.gobblin.publisher.NoopPublisher metrics.reporting.file.enabled=true metrics.log.dir=/tmp/suvasude/metrics metrics.reporting.file.suffix=txt task.data.root.dir=/tmp |
- Run gobblin-standalone.sh:
- bin/gobblin-standalone.sh start --conf <path-to-job-config-file>
...
- --workdir /tmp
You should see the following lines in the gobblin logs:
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.hadoop.AvroFileSource 58 - Running ls command with input /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.filebased.FileBasedSource 257 - Will pull the following files in this run: [hdfs://localhost:9000/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput/20171207212140_append/LinkedIn/part.task_PullFromWikipedia_1512681699997_0_0.avro, hdfs://localhost:9000/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput/20171207212140_append/Wikipedia_Sandbox/part.task_PullFromWikipedia_1512681699997_1_0.avro]
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.filebased.FileBasedSource 195 - Total number of work units for the current run: 2
- Finally, the output can be verified by consuming data from kafka topic WikipediaExample.
Mapreduce
This examples runs Gobblin in MapReduce mode. It reads files from HDFS using the HadoopTextFileSource implementation in gobblin-example and writes data to a single partition Kafka topic called MRTest.
...