Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
titleWordCountTest
collapsetrue
package org.apache.kafka.streams.integration

import java.util.Properties
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Tag

import java.io.File

/**
 * Test suite that does a classic word count example.
 * <p>
 * The suite contains the test case using Scala APIs `testShouldCountWords` and the same test case using the
 * Java APIs `testShouldCountWordsJava`. The idea is to demonstrate that both generate the same result.
 */
@Tag("integration")
class WordCountTest extends WordCountTestData {

  private val cluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)

  final private val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
  private val mockTime: MockTime = cluster.time
  mockTime.setCurrentTimeMs(alignedTime)

  private val testFolder: File = TestUtils.tempDirectory()

  @BeforeEach
  def startKafkaCluster(): Unit = {
    cluster.start()
    cluster.createTopic(inputTopic)
    cluster.createTopic(outputTopic)
    cluster.createTopic(inputTopicJ)
    cluster.createTopic(outputTopicJ)
  }

  @AfterEach
  def stopKafkaCluster(): Unit = {
    cluster.stop()
    Utils.delete(testFolder)
  }

  @Test
  def testShouldCountWords(): Unit = {
    import org.apache.kafka.streams.scala.serialization.Serdes._

    val streamsConfiguration = getStreamsConfiguration()

    val streamBuilder = new StreamsBuilder
    val textLines = streamBuilder.stream[String, String](inputTopic)

    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

    // generate word counts
    val wordCounts: KTable[String, Long] =
      textLines
        .flatMapValues(v => pattern.split(v.toLowerCase))
        .groupBy((_, v) => v)
        .count()

    // write to output topic
    wordCounts.toStream.to(outputTopic)

    val streams = new KafkaStreams(streamBuilder.build(), streamsConfiguration)
    streams.start()

    // produce and consume synchronously
    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopic, outputTopic)

    streams.close()

    import scala.jdk.CollectionConverters._
    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
  }

  @Test
  def testShouldCountWordsJava(): Unit = {

    import org.apache.kafka.common.serialization.{Serdes => JSerdes}
    import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
    import org.apache.kafka.streams.kstream.{
      KTable => KTableJ,
      KStream => KStreamJ,
      _
    }
    import scala.jdk.CollectionConverters._

    val streamsConfiguration = getStreamsConfiguration()
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSerdes.String().getClass.getNameclassOf[Serdes.StringSerde])
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSerdes.String().getClass.getNameclassOf[Serdes.StringSerde])

    val streamBuilder = new StreamsBuilderJ
    val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)

    val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)

    val wordCounts: KTableJ[String, java.lang.Long] = textLines
      .flatMapValues(v => pattern.split(v.toLowerCase).toBuffer.asJava)
      .groupBy((k, v) => v)
      .count()

    wordCounts.toStream.to(outputTopicJ, Produced.`with`(JSerdes.String(), JSerdes.Long()))

    val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
    streams.start()

    val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopicJ, outputTopicJ)

    streams.close()

    assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
  }

  private def getStreamsConfiguration(): Properties = {
    val streamsConfiguration: Properties = new Properties()

    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-test")
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath)
    streamsConfiguration
  }

  private def getProducerConfig(): Properties = {
    val p = new Properties()
    p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
    p.put(ProducerConfig.ACKS_CONFIG, "all")
    p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    p
  }

  private def getConsumerConfig(): Properties = {
    val p = new Properties()
    p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
    p.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-scala-integration-test-standard-consumer")
    p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
    p
  }

  private def produceNConsume(inputTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {

    val linesProducerConfig: Properties = getProducerConfig()

    import scala.jdk.CollectionConverters._
    IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues.asJava, linesProducerConfig, mockTime)

    val consumerConfig = getConsumerConfig()

    IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size)
  }
}

trait WordCountTestData {
  val inputTopic = s"inputTopic"
  val outputTopic = s"outputTopic"
  val inputTopicJ = s"inputTopicJ"
  val outputTopicJ = s"outputTopicJ"

  val inputValues = List(
    "Hello Kafka Streams",
    "All streams lead to Kafka",
    "Join Kafka Summit",
    "И теперь пошли русские слова"
  )

  val expectedWordCounts: List[KeyValue[String, Long]] = List(
    new KeyValue("hello", 1L),
    new KeyValue("all", 1L),
    new KeyValue("streams", 2L),
    new KeyValue("lead", 1L),
    new KeyValue("to", 1L),
    new KeyValue("join", 1L),
    new KeyValue("kafka", 3L),
    new KeyValue("summit", 1L),
    new KeyValue("и", 1L),
    new KeyValue("теперь", 1L),
    new KeyValue("пошли", 1L),
    new KeyValue("русские", 1L),
    new KeyValue("слова", 1L)
  )
}

...