Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateAdopted (3.0)

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

The TaskMetadata class in KafkaStreams is used for encoding information about a particular task, such as its offsets, topic partitions, and taskId. For some reason, the taskId is stored and exposed as a String object, rather than using the actual TaskId class. We should move towards returning this as a TaskId object, since that's literally what it is, and because if/when we add additional fields to the TaskId it will become more and more unwieldy to parse the string encoding in order to extract the actual information about the task.

...

At the same time, throughout this unexpectedly long and complicated analysis, it came to our attention how awkward the existing TaskId class is for a public API. It's clearly intended to be a simple metadata class to expose the topicGroupId (subtopology) and partition number, yet it has plenty of totally irrelevant (to a user) utility methods such as de/serialization, and instead of exposing the fields via getters it just makes them public. We should take this opportunity to clean up the TaskId class and move some of those inappropriate methods to an internal utility class.

Public Interfaces

Clean up the public TaskId class by deprecating inappropriate APIs, introducing the required new ones, and replacing topicGroupId with the more common and useful term "subtopology":

...

Note that since the name taskId() was already taken, we have to choose a different name for the getter on TaskMetadata, namely getTaskId(). Since this is outside the Kafka standards (which does not use the 'get' prefix for getters), we will actually migrate back to the plain taskId() getter later – once the deprecation period has elapsed, instead of removing the deprecated `String taskId()` we will replace it with a new `TaskId taskId()` API and then deprecate the temporary getTaskId().

Proposed Changes

To separate the implementation from the the public API, we will deprecate the various functional methods on the TaskId class and move their implementation to an internal utility class. Similarly, we will deprecate the public fields and introduce getters for those fields in their place. Those fields will be made private once the deprecation period has passed.

To migrate from the String to TaskId in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a TaskId getter in its place. Unfortunately the appropriate name for this getter in the Kafka standard is just `taskId()` – which is already taken. We plan to introduce a temporary getter called `getTaskId()` for the deprecation cycle of the `String taskId()` API, and then once that is over we can migrate back to a `TaskId taskId()` getter and then deprecate the temporary getter. It's a bit awkward, but we feel it's better to leave things in the ideal state than to go out of the way to avoid deprecations, especially when it's just one relatively-uncommon method.

Compatibility, Deprecation, and Migration Plan

The TaskId readFrom/writeTo methods and public topicGroupId fields will all be deprecated and removed in a future release. The TaskMetadata.taskId() method will also be deprecated. However, once enough time has passed for it to be removed, we will instead replace it with a new method signature that returns a TaskId instead of a String, and then deprecate the temporary TaskMetadata.getTaskId() API introduced in this KIP.

Rejected Alternatives

Quite a few alternatives were discussed during this seemingly simple KIP. At a high level, they either involved restructuring the existing TaskId class into a hierarchy, or removing any form of TaskId from the API altogether and just decompose any APIs into separate  methods for each of its fields. The former was rejected because it would mean that users could not rely on fundamental public contracts like the equals(), toString(), and compareTo() APIs, as they would become implementation details in the internal subclass/implementing class and therefore without public contract. The latter was rejected because the TaskId is and has been a fundamental concept in Kafka Streams, and something we regularly explain to users as a first-class citizen with a specific string representation that may help them make sense of anything from the logs to the local state and directory structure. Also, we feel that the taskid is a natural key on the task space of Kafka Streams and a natural way to think about the tasks themselves, more than just a simple data container class.

...