...
This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).
Suppose the wikipedia data is written to:
/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is
localhost:9092
.- Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
- Create a job config with the following properties:
job.name= GobblinHdfsQuickStartGobblinHdfsToKafkaQuickStart job.group= GobblinHdfsGobblinHdfsToKafka job.description=Gobblin quick start job for Hdfs to Kafka ingestion job.lock.enabled=false fs.uri=hdfs://localhost:9000 source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource extract.namespace=org.apache.gobblin.source.extractor.hadoop extract.table.type=SNAPSHOT_ONLY source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/Wikipedia_Sandbox WikipediaOutput source.filebased.fs.uri=hdfs://localhost:9000 writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder writer.kafka.topic=WikipediaExample writer.kafka.producerConfig.bootstrap.servers=localhost:9092 ##Confluent Schema Registry and serializers #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081 #Use Local Schema Registry and serializers writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\ ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\ pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\ pe": ["string", "null"]}]} data.publisher.type=org.apache.gobblin.publisher.NoopPublisher metrics.reporting.file.enabled=true metrics.log.dir=/tmp/suvasude/metrics metrics.reporting.file.suffix=txt task.data.root.dir=/tmp |
- Run gobblin-standalone.sh:
- bin/gobblin-standalone.sh start --conf <path-to-job-config-file> --workdir /tmp
...
You should see the following lines in the gobblin logs:
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.hadoop.AvroFileSource 58 - Running ls command with input /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.filebased.FileBasedSource 257 - Will pull the following files in this run: [hdfs://localhost:9000/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput/20171207212140_append/LinkedIn/part.task_PullFromWikipedia_1512681699997_0_0.avro, hdfs://localhost:9000/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput/20171207212140_append/Wikipedia_Sandbox/part.task_PullFromWikipedia_1512681699997_1_0.avro]
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.filebased.FileBasedSource 195 - Total number of work units for the current run: 2
Finally, upon successful execution, you should see the following lines in gobblin logs:
INFO [Task-committing-pool-0] org.apache.gobblin.runtime.fork.Fork 345 - Committing data for fork 0 of task <task_id> INFO [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager 441 - Commit called, will wait for commitTimeout : 60000 ms INFO [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher 48 - All components finished successfully, checking quality tests INFO [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher 50 - All required test passed for this task passed. INFO [Task-committing-pool-1] org.apache.gobblin.publisher.TaskPublisher 52 - Cleanup for task publisher executed successfully. INFO [Task-committing-pool-1] org.apache.gobblin.runtime.fork.Fork 345 - Committing data for fork 0 of task <task_id> INFO [Task-committing-pool-1] org.apache.gobblin.writer.AsyncWriterManager 441 - Commit called, will wait for commitTimeout : 60000 ms INFO [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager 482 - Successfully committed 2 records. INFO [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager 424 - Close called INFO [Task-committing-pool-0] org.apache.gobblin.writer.AsyncWriterManager 430 - Successfully done closing INFO [Task-committing-pool-1] org.apache.gobblin.writer.AsyncWriterManager 482 - Successfully committed 10 records. |
- You can also verify the output is written to kafka topic WikipediaExample by consuming from the topic.
Mapreduce
We next run the same example as above in MapReduce mode. We write data to a new Kafka topic WikipediaExample1.
...
- Set up a single node Kafka broker as in the standalone mode
- Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster.
Create a job config with the following properties:
job job.name=
GobblinHdfsMRQuickStartGobblinHdfsToKafkaQuickStart
job.group=
GobblinHdfsMRGobblinHdfsToKafka
job.description=Gobblin quick start job for Hdfs to Kafka ingestion
job.lock.enabled=false
launcher.type=MAPREDUCE
fs.uri=hdfs://localhost:9000
source.class=org.apache.gobblin.source.
exampleextractor.hadoop.
HadoopTextFileSourceAvroFileSource
extract.namespace=org.apache.gobblin.source.
exampleextractor.hadoop
extract.
table.nametablename=
testWikipedia
extract.table.type=
APPENDSNAPSHOT_ONLY
source.filebased.
fsdata.
uri=hdfs://localhost:9000state.storedirectory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
source.hadoop.file.input.format.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
source.hadoop.file.splits.desired=1
source.hadoop.file.input.paths=hdfs://localhost:9000/data/test
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample1
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.
converterkafka.
stringserialize.
ObjectToStringConverterLiAvroSerializer
writer
.builder.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.
writerschemareg.
KafkaDataWriterBuilderConfigDrivenMd5SchemaRegistry
writer.kafka.
topic=MRTestwriter.kafka.producerConfig.bootstrap.servers=localhost:9092producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value
.serializer=org.apache.kafka.common.serialization.StringSerializer={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
mr.job.root.dir=/gobblin-kafka/working
statetask.data.
storeroot.dir=/tmp
Run gobblin-mapreduce.sh:
bin/gobblin-mapreduce.sh --jars lib/reactive-streams-1.0.0.jar,lib/gobblin-kafka-08-<GOBBLIN-VERSION>.jar,lib/gobblin-kafka-common-<GOBBLIN-VERSION>.jar --conf ~/gobblin/conf/ex4.pull --workdir /tmp. Replace <GOBBLIN-VERSION> in the above command with the actual version set during the distribution build.
- You should see the following in the gobblin logs:
INFO [main] org.apache.hadoop.yarn.client.RMProxy 92 - Connecting to ResourceManager at /0.0.0.0:8032
INFO [main] org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat 92 - Found 2 input files at <HDFS_path>
INFO [main] org.apache.hadoop.mapreduce.JobSubmitter 396 - number of splits:2
INFO [main] org.apache.hadoop.mapreduce.JobSubmitter 479 - Submitting tokens for job: <job_id>
INFO [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl 166 - Submitted application <application_id>
INFO [main] org.apache.hadoop.mapreduce.Job 1289 - The url to track the job: <URL>
INFO [main] org.apache.gobblin.runtime.mapreduce.MRJobLauncher 249 - Waiting for Hadoop MR job <job_id> to complete
INFO [main] org.apache.hadoop.mapreduce.Job 1334 - Running job: <job_id>
INFO [main] org.apache.hadoop.mapreduce.Job 1355 - Job <job_id> running in uber mode : false
INFO [main] org.apache.hadoop.mapreduce.Job 1362 - map 0% reduce 0%
INFO [main] org.apache.hadoop.mapreduce.Job 1362 - map 50% reduce 0%
INFO [main] org.apache.hadoop.mapreduce.Job 1362 - map 100% reduce 0%
INFO [main] org.apache.hadoop.mapreduce.Job 1373 - Job <job_id> completed successfully
As before, the data can be verified by consuming from the output kafka topic WikipediaExample1.
-kafka/state-storetask.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data