...
job.name=GobblinHdfsToKafkaQuickStart job.group=GobblinHdfsGobblinHdfsToKafka job.description=Gobblin quick start job for Hdfs to Kafka ingestion job.lock.enabled=false fs.uri=hdfs://localhost:9000 source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource extract.namespace=org.apache.gobblin.source.extractor.hadoop extract.table.type=SNAPSHOT_ONLY source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput source.filebased.fs.uri=hdfs://localhost:9000 writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder writer.kafka.topic=WikipediaExample writer.kafka.producerConfig.bootstrap.servers=localhost:9092 ##Confluent Schema Registry and serializers #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081 #Use Local Schema Registry and serializers writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\ ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\ pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\ pe": ["string", "null"]}]} data.publisher.type=org.apache.gobblin.publisher.NoopPublisher metrics.reporting.file.enabled=true metrics.log.dir=/tmp/suvasude/metrics metrics.reporting.file.suffix=txt task.data.root.dir=/tmp |
...
- Set up a single node Kafka broker as in the standalone mode
- Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster.
Create a job config with the following properties:
job.name=GobblinHdfsToKafkaQuickStart
job.group=GobblinHdfsGobblinHdfsToKafka
job.description=Gobblin quick start job for Hdfs to Kafka ingestion
job.lock.enabled=false
launcher.type=MAPREDUCE
fs.uri=hdfs://localhost:9000
source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource
extract.namespace=org.apache.gobblin.source.extractor.hadoop
extract.tablename=Wikipedia
extract.table.type=SNAPSHOT_ONLY
source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
source.filebased.fs.uri=hdfs://localhost:9000
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=WikipediaExample1
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
##Confluent Schema Registry and serializers
#writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
#writer.kafka.producerConfig.schema.registry.url=http://localhost:8081
#Use Local Schema Registry and serializers
writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer
writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry
writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample
writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\
": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\
pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\
pe": ["string", "null"]}]}
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
mr.job.root.dir=/gobblin-kafka/working
task.data.root.dir=/tmp
Run gobblin-mapreduce.sh:
bin/gobblin-mapreduce.sh --jars lib/reactive-streams-1.0.0.jar,lib/gobblin-kafka-08-<GOBBLIN-VERSION>.jar,lib/gobblin-kafka-common-<GOBBLIN-VERSION>.jar --conf ~/gobblin/conf/ex4.pull --workdir /tmp. Replace <GOBBLIN-VERSION> in the above command with the actual version set during the distribution build.
- You should see the following in the gobblin logs:
INFO [main] org.apache.hadoop.yarn.client.RMProxy 92 - Connecting to ResourceManager at /0.0.0.0:8032
INFO [main] org.apache.gobblin.runtime.mapreduce.GobblinWorkUnitsInputFormat 92 - Found 2 input files at <HDFS_path>
INFO [main] org.apache.hadoop.mapreduce.JobSubmitter 396 - number of splits:2
INFO [main] org.apache.hadoop.mapreduce.JobSubmitter 479 - Submitting tokens for job: <job_id>
INFO [main] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl 166 - Submitted application <application_id>
INFO [main] org.apache.hadoop.mapreduce.Job 1289 - The url to track the job: <URL>
INFO [main] org.apache.gobblin.runtime.mapreduce.MRJobLauncher 249 - Waiting for Hadoop MR job <job_id> to complete
INFO [main] org.apache.hadoop.mapreduce.Job 1334 - Running job: <job_id>
INFO [main] org.apache.hadoop.mapreduce.Job 1355 - Job <job_id> running in uber mode : false
INFO [main] org.apache.hadoop.mapreduce.Job 1362 - map 0% reduce 0%
INFO [main] org.apache.hadoop.mapreduce.Job 1362 - map 50% reduce 0%
INFO [main] org.apache.hadoop.mapreduce.Job 1362 - map 100% reduce 0%
INFO [main] org.apache.hadoop.mapreduce.Job 1373 - Job <job_id> completed successfully
As before, the data can be verified by consuming from the output kafka topic WikipediaExample1.