Status

Current state: Vote

Discussion thread: https://lists.apache.org/thread/zl9f72wwmdc67cv5x295w8r6p547tzdl

Vote thread: https://lists.apache.org/thread/4jmq4w0871tvd0gv9cppb1jl6d7lfryo

JIRA: KAFKA-19976 - Getting issue details... STATUS

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The streams-scala module was introduced in Apache Kafka 2.0 (KIP-270) to provide a more idiomatic Scala API for Kafka Streams, but its maintenance now presents more challenges than benefits. This proposal advocates for its removal based on several factors:

  • Low Adoption: The module has very low usage within the Kafka Streams community. Unique IPs data shows that only 1.33% of developers use the Scala 2.12 artifact, and 4.79% use the Scala 2.13 version.


kafka-streamskafka-streams-scala_2.11kafka-streams-scala_2.12

kafka-streams-scala_2.13

202501232056127393012983
20250222965379377214392
202503260768117347714921
20250426597566495416602
202505353164160589317017
202506449788211631116867
20250741844453497514482
20250825083775294214201
20250925766273289315201
20251028397773351416682
20251125819469359013109
Total3260518110346251166457
Ratio93.84%0.03%1.33%4.79%


  • Lack of Support for Latest Scala Versions: The module does not support Scala 3, which was released four years ago. Community discussions around a potential migration have been inactive for a significant time, indicating a lack of momentum to keep the module current with the evolution of the Scala language.

  • Lack of Active Maintainers: Analysis of git history shows that while the module has had contributions from various developers, there are no dedicated maintainers. Most recent commits (2023-2025) have been mechanical updates to keep pace with Streams API changes (deprecation removals, API updates) rather than proactive improvements or bug fixes.

Public Interfaces

This proposal impacts the public interfaces contained within the org.apache.kafka.streams.scala package. All classes, objects, and implicit conversions within this package will be deprecated and subsequently removed. The primary Java interfaces in org.apache.kafka.streams will remain unaffected and can still be used from Scala projects.

Proposed Changes

The proposal is to deprecate the streams-scala module in Kafka version 4.3 and completely remove it in version 5.0.

  1. In version 4.3:

    • Annotate all public classes and methods within the org.apache.kafka.streams.scala package with @deprecated.

    • Update the module's documentation to clearly state its deprecated status and guide users toward migration.

  1. In version 5.0:

    • Delete the streams-scala module and all associated source code from the Apache Kafka repository.

    • Remove any references to the module from the official documentation and build files.

Compatibility, Deprecation, and Migration Plan

  • Encourage volunteers to maintain this module outside of Apache Kafka. This KIP focuses on deprecation. If any volunteer would like to maintain this module outside of Apache Kafka, please use another KIP for it.
  • Provide a mapping guide from streams-scala API to the Java Streams API.

  • Add examples in the Kafka documentation for how to recreate common Scala-DSL patterns in Java.

Word count mapping example. The testShouldCountWords uses streams-scala and testShouldCountWordsJava uses Java library.

WordCountTest
package org.apache.kafka.streams.integration

import java.util.Properties
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Tag

import java.io.File

/**
 * Test suite that does a classic word count example.
 * <p>
 * The suite contains the test case using Scala APIs `testShouldCountWords` and the same test case using the
 * Java APIs `testShouldCountWordsJava`. The idea is to demonstrate that both generate the same result.
 */
@Tag("integration")
class WordCountTest extends WordCountTestData {

  private val cluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)

  final private val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
  private val mockTime: MockTime = cluster.time
  mockTime.setCurrentTimeMs(alignedTime)

  private val testFolder: File = TestUtils.tempDirectory()

  @BeforeEach
  def startKafkaCluster(): Unit = {
    cluster.start()
    cluster.createTopic(inputTopic)
    cluster.createTopic(outputTopic)
    cluster.createTopic(inputTopicJ)
    cluster.createTopic(outputTopicJ)
  }

  @AfterEach
  def stopKafkaCluster(): Unit = {
    cluster.stop()
    Utils.delete(testFolder)
  }

  @Test
  def testShouldCountWords(): Unit = {
    import org.apache.kafka.streams.scala.serialization.Serdes._

    val streamsConfiguration = getStreamsConfiguration()

    val streamBuilder = new StreamsBuilder
    val textLines = streamBuilder.stream[String, String](inputTopic)

    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

    // generate word counts
    val wordCounts: KTable[String, Long] =
      textLines
        .flatMapValues(v => pattern.split(v.toLowerCase))
        .groupBy((_, v) => v)
        .count()

    // write to output topic
    wordCounts.toStream.to(outputTopic)

    val streams = new KafkaStreams(streamBuilder.build(), streamsConfiguration)
    streams.start()

    // produce and consume synchronously
    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopic, outputTopic)

    streams.close()

    import scala.jdk.CollectionConverters._
    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
  }

  @Test
  def testShouldCountWordsJava(): Unit = {

    import org.apache.kafka.common.serialization.{Serdes => JSerdes}
    import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
    import org.apache.kafka.streams.kstream.{
      KTable => KTableJ,
      KStream => KStreamJ,
      _
    }
    import scala.jdk.CollectionConverters._

    val streamsConfiguration = getStreamsConfiguration()
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSerdes.String().getClass.getName)
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSerdes.String().getClass.getName)

    val streamBuilder = new StreamsBuilderJ
    val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)

    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

    val wordCounts: KTableJ[String, java.lang.Long] = textLines
      .flatMapValues(v => pattern.split(v.toLowerCase).toBuffer.asJava)
      .groupBy((k, v) => v)
      .count()

    wordCounts.toStream.to(outputTopicJ, Produced.`with`(JSerdes.String(), JSerdes.Long()))

    val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
    streams.start()

    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopicJ, outputTopicJ)

    streams.close()

    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
  }

  private def getStreamsConfiguration(): Properties = {
    val streamsConfiguration: Properties = new Properties()

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-test")
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath)
    streamsConfiguration
  }

  private def getProducerConfig(): Properties = {
    val p = new Properties()
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
    p.put(ProducerConfig.ACKS_CONFIG, "all")
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    p
  }

  private def getConsumerConfig(): Properties = {
    val p = new Properties()
    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
    p.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-scala-integration-test-standard-consumer")
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
    p
  }

  private def produceNConsume(inputTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {

    val linesProducerConfig: Properties = getProducerConfig()

    import scala.jdk.CollectionConverters._
    IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues.asJava, linesProducerConfig, mockTime)

    val consumerConfig = getConsumerConfig()

    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size)
  }
}

trait WordCountTestData {
  val inputTopic = s"inputTopic"
  val outputTopic = s"outputTopic"
  val inputTopicJ = s"inputTopicJ"
  val outputTopicJ = s"outputTopicJ"

  val inputValues = List(
    "Hello Kafka Streams",
    "All streams lead to Kafka",
    "Join Kafka Summit",
    "И теперь пошли русские слова"
  )

  val expectedWordCounts: List[KeyValue[String, Long]] = List(
    new KeyValue("hello", 1L),
    new KeyValue("all", 1L),
    new KeyValue("streams", 2L),
    new KeyValue("lead", 1L),
    new KeyValue("to", 1L),
    new KeyValue("join", 1L),
    new KeyValue("kafka", 3L),
    new KeyValue("summit", 1L),
    new KeyValue("и", 1L),
    new KeyValue("теперь", 1L),
    new KeyValue("пошли", 1L),
    new KeyValue("русские", 1L),
    new KeyValue("слова", 1L)
  )
}


Test Plan

The existing tests for the streams-scala module will be removed along with the module itself in version 5.0. No new tests are required, as this KIP proposes the removal of functionality, not the addition of it.

Rejected Alternatives

N/A

  • No labels