This section helps you set up quick-start jobs for ingesting data from HDFS to Kafka topic. We currently do not support the ability to write from HDFS to multiple Kafka topics. Also, we do not support partitioning by keys when writing to Kafka. We do allow topics with multiple partitions. However, in this case, the data will be distributed across partitions in a round robin manner.
We will illustrate both the Standalone and Mapreduce modes of operation.
Standalone
This example assumes Wikipedia data has been written to HDFS by following the instructions in the Wikipedia example (with some minor modifications to write to HDFS).
Suppose the wikipedia data is written to:
/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
Setup a single node Kafka broker by following the Kafka quick start guide. Suppose your broker URI is
localhost:9092
.- Set up a single-node Hadoop cluster in pseudo-distributed mode by following the instructions here.
- Create a job config with the following properties:
job.name=GobblinHdfsQuickStart job.group=GobblinHdfs job.description=Gobblin quick start job for Hdfs job.lock.enabled=false fs.uri=hdfs://localhost:9000 source.class=org.apache.gobblin.source.extractor.hadoop.AvroFileSource extract.namespace=org.apache.gobblin.source.extractor.hadoop extract.table.type=SNAPSHOT_ONLY source.filebased.data.directory=/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput source.filebased.fs.uri=hdfs://localhost:9000 writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder writer.kafka.topic=WikipediaExample writer.kafka.producerConfig.bootstrap.servers=localhost:9092 ##Confluent Schema Registry and serializers #writer.kafka.producerConfig.value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.key.serializer=io.confluent.kafka.serializers.KafkaAvroSerializer #writer.kafka.producerConfig.schema.registry.url=http://localhost:8081 #Use Local Schema Registry and serializers writer.kafka.producerConfig.value.serializer=org.apache.gobblin.kafka.serialize.LiAvroSerializer writer.kafka.producerConfig.kafka.schemaRegistry.class=org.apache.gobblin.kafka.schemareg.ConfigDrivenMd5SchemaRegistry writer.kafka.producerConfig.schemaRegistry.schema.name=WikipediaExample writer.kafka.producerConfig.schemaRegistry.schema.value={"namespace": "example.wikipedia.avro","type": "record","name": "WikipediaArticle","fields": [{"name": "pageid", "type": ["double", "null"]},{"name\ ": "title", "type": ["string", "null"]},{"name": "user", "type": ["string", "null"]},{"name": "anon", "type": ["string", "null"]},{"name": "userid", "type": ["double", "null"]},{"name": "timestamp", "ty\ pe": ["string", "null"]},{"name": "size", "type": ["double", "null"]},{"name": "contentformat", "type": ["string", "null"]},{"name": "contentmodel", "type": ["string", "null"]},{"name": "content", "ty\ pe": ["string", "null"]}]} data.publisher.type=org.apache.gobblin.publisher.NoopPublisher metrics.reporting.file.enabled=true metrics.log.dir=/tmp/suvasude/metrics metrics.reporting.file.suffix=txt task.data.root.dir=/tmp |
- Run gobblin-standalone.sh:
- bin/gobblin-standalone.sh start --conf <path-to-job-config-file> --workdir /tmp
You should see the following lines in the gobblin logs:
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.hadoop.AvroFileSource 58 - Running ls command with input /data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.filebased.FileBasedSource 257 - Will pull the following files in this run: [hdfs://localhost:9000/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput/20171207212140_append/LinkedIn/part.task_PullFromWikipedia_1512681699997_0_0.avro, hdfs://localhost:9000/data/wikipedia/org/apache/gobblin/example/wikipedia/WikipediaOutput/20171207212140_append/Wikipedia_Sandbox/part.task_PullFromWikipedia_1512681699997_1_0.avro]
INFO [JobScheduler-2] org.apache.gobblin.source.extractor.filebased.FileBasedSource 195 - Total number of work units for the current run: 2
- Finally, the output can be verified by consuming data from kafka topic WikipediaExample.
Mapreduce
This examples runs Gobblin in MapReduce mode. It reads files from HDFS using the HadoopTextFileSource implementation in gobblin-example and writes data to a single partition Kafka topic called MRTest.
- Set up a single node Kafka broker as in the standalone mode
- Set up a single node Hadoop cluster in pseudo-distributed mode as explained here. Follow the instructions to set up YARN cluster.
Create a job config with the following properties:
job.name=GobblinHdfsMRQuickStart
job.group=GobblinHdfsMR
job.description=Gobblin quick start job for Hdfs
job.lock.enabled=false
launcher.type=MAPREDUCE
fs.uri=hdfs://localhost:9000
source.class=org.apache.gobblin.example.hadoop.HadoopTextFileSource
extract.namespace=org.apache.gobblin.example.hadoop
extract.table.name=test
extract.table.type=APPEND_ONLY
writer.fs.uri=hdfs://localhost:9000
state.store.fs.uri=hdfs://localhost:9000
source.hadoop.file.input.format.class=org.apache.hadoop.mapreduce.lib.input.TextInputFormat
source.hadoop.file.splits.desired=1
source.hadoop.file.input.paths=hdfs://localhost:9000/data/test
converter.classes=org.apache.gobblin.converter.string.ObjectToStringConverter
writer.builder.class=org.apache.gobblin.kafka.writer.KafkaDataWriterBuilder
writer.kafka.topic=MRTest
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
writer.kafka.producerConfig.value.serializer=org.apache.kafka.common.serialization.StringSerializer
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
mr.job.max.mappers=1
metrics.reporting.file.enabled=true
metrics.log.dir=/tmp/suvasude/metrics
metrics.reporting.file.suffix=txt
mr.job.root.dir=/gobblin-kafka/working
state.store.dir=/gobblin-kafka/state-store
task.data.root.dir=/jobs/kafkaetl/gobblin/gobblin-kafka/task-data
Run gobblin-mapreduce.sh:
- bin/gobblin-mapreduce.sh --conf <path-to-job-config-file>