DUE TO SPAM, SIGN-UP IS DISABLED. Goto Selfserve wiki signup and request an account.
...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
package org.apache.kafka.streams.integration
import java.util.Properties
import java.util.regex.Pattern
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.utils.{MockTime, Utils}
import org.apache.kafka.common.serialization.{LongDeserializer, StringDeserializer, StringSerializer}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Tag
import java.io.File
/**
* Test suite that does a classic word count example.
* <p>
* The suite contains the test case using Scala APIs `testShouldCountWords` and the same test case using the
* Java APIs `testShouldCountWordsJava`. The idea is to demonstrate that both generate the same result.
*/
@Tag("integration")
class WordCountTest extends WordCountTestData {
private val cluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1)
final private val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000
private val mockTime: MockTime = cluster.time
mockTime.setCurrentTimeMs(alignedTime)
private val testFolder: File = TestUtils.tempDirectory()
@BeforeEach
def startKafkaCluster(): Unit = {
cluster.start()
cluster.createTopic(inputTopic)
cluster.createTopic(outputTopic)
cluster.createTopic(inputTopicJ)
cluster.createTopic(outputTopicJ)
}
@AfterEach
def stopKafkaCluster(): Unit = {
cluster.stop()
Utils.delete(testFolder)
}
@Test
def testShouldCountWords(): Unit = {
import org.apache.kafka.streams.scala.serialization.Serdes._
val streamsConfiguration = getStreamsConfiguration()
val streamBuilder = new StreamsBuilder
val textLines = streamBuilder.stream[String, String](inputTopic)
val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
// generate word counts
val wordCounts: KTable[String, Long] =
textLines
.flatMapValues(v => pattern.split(v.toLowerCase))
.groupBy((_, v) => v)
.count()
// write to output topic
wordCounts.toStream.to(outputTopic)
val streams = new KafkaStreams(streamBuilder.build(), streamsConfiguration)
streams.start()
// produce and consume synchronously
val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopic, outputTopic)
streams.close()
import scala.jdk.CollectionConverters._
assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
}
@Test
def testShouldCountWordsJava(): Unit = {
import org.apache.kafka.common.serialization.{Serdes => JSerdes}
import org.apache.kafka.streams.{KafkaStreams => KafkaStreamsJ, StreamsBuilder => StreamsBuilderJ}
import org.apache.kafka.streams.kstream.{
KTable => KTableJ,
KStream => KStreamJ,
_
}
import scala.jdk.CollectionConverters._
val streamsConfiguration = getStreamsConfiguration()
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JSerdes.String().getClass.getNameclassOf[Serdes.StringSerde])
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JSerdes.String().getClass.getNameclassOf[Serdes.StringSerde])
val streamBuilder = new StreamsBuilderJ
val textLines: KStreamJ[String, String] = streamBuilder.stream[String, String](inputTopicJ)
val pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS)
val wordCounts: KTableJ[String, java.lang.Long] = textLines
.flatMapValues(v => pattern.split(v.toLowerCase).toBuffer.asJava)
.groupBy((k, v) => v)
.count()
wordCounts.toStream.to(outputTopicJ, Produced.`with`(JSerdes.String(), JSerdes.Long()))
val streams: KafkaStreamsJ = new KafkaStreamsJ(streamBuilder.build(), streamsConfiguration)
streams.start()
val actualWordCounts: java.util.List[KeyValue[String, Long]] = produceNConsume(inputTopicJ, outputTopicJ)
streams.close()
assertEquals(actualWordCounts.asScala.take(expectedWordCounts.size).sortBy(_.key), expectedWordCounts.sortBy(_.key))
}
private def getStreamsConfiguration(): Properties = {
val streamsConfiguration: Properties = new Properties()
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-test")
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "10000")
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getPath)
streamsConfiguration
}
private def getProducerConfig(): Properties = {
val p = new Properties()
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ProducerConfig.ACKS_CONFIG, "all")
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
p
}
private def getConsumerConfig(): Properties = {
val p = new Properties()
p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers())
p.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-scala-integration-test-standard-consumer")
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer])
p
}
private def produceNConsume(inputTopic: String, outputTopic: String): java.util.List[KeyValue[String, Long]] = {
val linesProducerConfig: Properties = getProducerConfig()
import scala.jdk.CollectionConverters._
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues.asJava, linesProducerConfig, mockTime)
val consumerConfig = getConsumerConfig()
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedWordCounts.size)
}
}
trait WordCountTestData {
val inputTopic = s"inputTopic"
val outputTopic = s"outputTopic"
val inputTopicJ = s"inputTopicJ"
val outputTopicJ = s"outputTopicJ"
val inputValues = List(
"Hello Kafka Streams",
"All streams lead to Kafka",
"Join Kafka Summit",
"И теперь пошли русские слова"
)
val expectedWordCounts: List[KeyValue[String, Long]] = List(
new KeyValue("hello", 1L),
new KeyValue("all", 1L),
new KeyValue("streams", 2L),
new KeyValue("lead", 1L),
new KeyValue("to", 1L),
new KeyValue("join", 1L),
new KeyValue("kafka", 3L),
new KeyValue("summit", 1L),
new KeyValue("и", 1L),
new KeyValue("теперь", 1L),
new KeyValue("пошли", 1L),
new KeyValue("русские", 1L),
new KeyValue("слова", 1L)
)
} |
...