0.8 is a huge step forward in functionality from 0.7.x. There are still some rough edges and likely some bugs, but we are feature complete and are beginning the process of rolling it out to various test clusters to see what happens. We wanted to make an early pre-release version available for the brave to try out even before all the documentation is up to date and production hardening is complete. Please let us know any problems you find.
This release includes the following major features:
- Partitions are now replicated. Previously the topic would remain available in the case of server failure, but individual partitions within that topic could disappear when the server hosting them stopped. If a broker failed permanently any unconsumed data it hosted would be lost. Starting with 0.8 all partitions have a replication factor and we get the prior behavior as the special case where replication factor = 1. Replicas have a notion of committed messages and guarantee that committed messages won't be lost as long as at least one replica survives. Replica logs are byte-for-byte identical across replicas.
- Producer and consumer are replication aware. When running in sync mode, by default, the producer send() request blocks until the messages sent is committed to the active replicas. As a result the sender can depend on the guarantee that a message sent will not be lost. Latency sensitive producers have the option to tune this to block only on the write to the leader broker or to run completely async if they are willing to forsake this guarantee. The consumer will only see messages that have been committed.
- The consumer has been moved to a "long poll" model where fetch requests block until there is data available. This enables low latency without frequent polling. In general end-to-end message latency from producer to broker to consumer of only a few milliseconds is now possible.
- We now retain the key used in the producer for partitioning with each message, so the consumer knows the partitioning key.
- We have moved from directly addressing messages with a byte offset to using a logical offset (i.e. 0, 1, 2, 3...). The offset still works exactly the same - it is a monotonically increasing number that represents a point-in-time in the log - but now it is no longer tied to byte layout. This has several advantages: (1) it is asthetically nice, (2) it makes it trivial to calculate the next offset or to traverse messages in reverse order, (3) it fixes a corner case interaction between consumer commit() and compressed message batches. Data is still transferred using the same efficient zero-copy mechanism as before.
- We have removed the zookeeper dependency from the producer and replaced it with a simple cluster metadata api.
- We now support multiple data directories (i.e. a JBOD setup).
- We now expose both the partition and the offset for each message in the high-level consumer.
- We have substantially improved our integration testing, adding a new integration test framework and over 100 distributed regression and performance test scenarios that we run on every checkin.
A complete list of changes is available here.
This is also our first backwards incompatible release--maintaining wire compatibility with the older code was simply too difficult given the volume of changes. We don't intend to make a habit of this. To ease the migration we have provided a migration tool to allow replicating from a 0.7.x cluster to a 0.8 cluster. This allows a no downtime migration: first create a 0.8 cluster and use the migration tool to replicate the 0.7 cluster's data to the 0.8 cluster, then move 0.7 consumers to 0.8, and finally move 0.7 producers to the new cluster.
Since 0.8 is not backward compatible with 0.7.x, if you had an 0.7 installation already, you will need to wipe out all existing Zookeeper data and Kafka log data (unless, you use a different Zookeeper namespace and a new Kafka log directory).
Step 1: Download the code
Check out kafka 0.8 from https://git-wip-us.apache.org/repos/asf/kafka.git and then build.
Step 2: Make some config changes
Suppose that I want to run 3 brokers locally. Make the following config changes.
Comment out the JMX_PORT setting in the following script since we are going to run multiple instances of the script locally.
Create one config file per broker by copying from config/server.properties and overriding the following values.
Step 3: Start the server
First start the zookeeper server. If you want to set up a Zookeeper cluster on multiple servers, follow the instructions here.
Note: This assumes you haven't run a previous version of Kafka against this zookeeper instance. The Zookeeper paths are not compatible so the new code won't work with the old paths.
Now start the Kafka brokers in separate shells:
Step 4: Create a topic
Create a topic with a replication factor of 3.
Step 5: Send some messages
Type some messages to send.
Step 6: Start a consumer
After the messages are produced, you should see the data being replicated to the three log directories for each of the broker instances.