Why use SimpleConsumer?
The main reason to use a SimpleConsumer implementation is you want greater control over partition consumption than Consumer Groups give you.
For example you want to:
- Read a message multiple times
- Consume only a subset of the partitions in a topic in a process
- Manage transactions to make sure a message is processed once and only once
Downsides of using SimpleConsumer
The SimpleConsumer does require a significant amount of work not needed in the Consumer Groups:
- You must keep track of the offsets in your application to know where you left off consuming.
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes
Steps for using a SimpleConsumer
- Find an active Broker and find out which Broker is the leader for your topic and partition
- Determine who the replica Brokers are for your topic and partition
- Build the request defining what data you are interested in
- Fetch the data
- Identify and recover from leader changes
Finding the Lead Broker for a Topic and Partition
The easiest way to do this is to pass in a set of known Brokers to your logic, either via a properties file or the command line. These don’t have to be all the Brokers in the cluster, rather just a set where you can start looking for a live Broker to query for Leader information.
The call to topicsMetadata() asks the Broker you are connected to for all the details about the topic we are interested in.
The loop on partitionsMetadata iterates through all the partitions until we find the one we want. Once we find it, we can break out of all the loops.
Finding Starting Offset for Reads
Now define where to start reading data. Kafka includes two constants to help, kafka.api.OffsetRequest.EarliestTime() finds the beginning of the data in the logs and starts streaming from there, kafka.api.OffsetRequest.LatestTime() will only stream new messages. Don’t assume that offset 0 is the beginning offset, since messages age out of the log over time.
Since the SimpleConsumer doesn't handle lead Broker failures, you have to write a bit of code to handle it.
Here, once the fetch returns an error, we log the reason, close the consumer then try to figure out who the new leader is.
This method uses the findLeader() logic we defined earlier to find the new leader, except here we only try to connect to one of the replicas for the topic/partition. This way if we can’t reach any of the Brokers with the data we are interested in we give up and exit hard.
Since it may take a short time for ZooKeeper to detect the leader loss and assign a new leader, we sleep if we don’t get an answer. In reality ZooKeeper often does the failover very quickly so you never sleep.
Reading the Data
Finally we read the data being streamed back and write it out.
Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn't read anything on the last request we go to sleep for a second so we aren't hammering Kafka when there is no data.
Running the example
The example expects the following parameters:
- Maximum number of messages to read (so we don’t loop forever)
- Topic to read from
- Partition to read from
- One broker to use for Metadata lookup
- Port the brokers listen on