You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »


Status

Current state: Under Discussion

Discussion thread: here

JIRA:


Motivation

In my organisation, we have been using kafka as the basic publish-subscribe messaging system provider. Our goal is the send event-based messages reliably and securely, and perform data synchronisation based on the messages. For us, the message keys represent some metadata which we use to either ignore messages (if a loopback) or log some information. We have the following use case for messaging:

1) A Database transaction event takes place

2) The event is captured and messaged across 10 data centres all around the world.

3) A group of consumers (for each data centre with a unique consumer-group ID) are will process messages from their respective partitions. 1 consumer per partition.

Under the circumstances, we only need a guarantee that same message won't be sent to multiple partitions. Using DefaultPartitioner, we can achieve this only with NULL keys. But since we need keys for metadata, we cannot maintain "Round-robin" selection of partitions because a key hash will determine which partition to choose. We need both non-null key with round-robin partition selection for KafkaProducer.


Public Interfaces

There is no requirement to change any interfaces. We simply use the existing paritioner.class config in server.properties and change the name of the FQCN to ours.

Proposed Changes

We therefore, would like to propose an extension of DefaultPartitioner, "KeylessPartitioner". The reason we believe "Keyless" is the most appropriate name is because it doesn't focus on a key. The partitioner will have code almost identical to DefaultPartitioner#partition method, except that it will simply execute the "Null Key and No Partition" logic from DefaultPartitioner. The following is the content of partition() method for our new partitioner.

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
      
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
    There is no impact to existing users. This class does not need to be used unless someone has similar requirements.
  • If we are changing behavior how will we phase out the older behavior?
    No change in any existing behaviour, since the class usage is controlled by partitioner.class property in server.properties. We are not changing the default value.
  • If we need special migration tools, describe them here. 
    Not required.
  • When will we remove the existing behavior?
    Not required.

Rejected Alternatives

We could package our own jar and use it with every Kafka Distro, but we would like to make it part of the existing trunk and open it up to other developers for extension if necessary.

  • No labels