Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This document outlines a proposal for developing a Scala library as a wrapper over the existing Java APIs for Kafka Streams.
Kafka Streams currently offers Java APIs based on the Builder design pattern, which allows users to incrementally build the target functionality using lower level compositional fluent APIs. The problems of using these APIs from a Scala code are 2 fold:
Additional type annotations - The Java APIs use Java generics in a way that are not fully compatible with the type inferencer of the Scala compiler. Hence the user has to add type annotations to the Scala code, which seems rather non-idiomatic in Scala.
Verbosity - In some cases the Java APIs appear too verbose compared to idiomatic Scala.
Type Unsafety - The Java APIs offer some options where the compile time type safety is sometimes subverted and can result in runtime errors. This stems from the fact that the serdes defined as part of config are not type checked during compile time. Hence any missing serdes can result in runtime errors.
The suggested Scala library is a wrapper over the existing Java APIs for Kafka Streams DSL and addresses the above 3 concerns. It does not attempt to provide idiomatic Scala APIs that one would implement in a Scala library developed from scratch. The intention is to make the Java APIs more usable in Scala through better type inferencing, enhanced expressiveness, and lesser boilerplates.
The library wraps Java Stream DSL APIs in Scala thereby providing:
Better type inference in Scala
Less boilerplate in application code
The usual builder-style composition that developers get with the original Java API
Implicit serializers and de-serializers leading to better abstraction and less verbosity
Better type safety during compile time
The above points result in an overall improved productivity for development.
This document introduces the Kafka Streams Scala library.
In addition, we received a proposal for an alternate implementation of the same functionality using the type class based approach in Scala. This is the PR currently open in our repository and is based on a fork of our implementation. There has been lots of discussions on the pros and cons of both the approaches.
kafka-streams-scala only depends on the Scala standard library and Kafka Streams.
The library works by wrapping the original Java abstractions of Kafka Streams within a Scala wrapper object and then using implicit conversions between them. All the Scala abstractions are named identically as the corresponding Java abstraction but they reside in a different package of the library e.g. the Scala class
org.apache.kafka.streams.scala.StreamsBuilder is a wrapper around
org.apache.kafka.streams.scala.kstream.KStream is a wrapper around
Here's an example of the classic Word Count program that uses the Scala builder
StreamBuilder and then builds an instance of
KStream using the wrapped API
builder.stream. Then we reify to a table and get a
KTable, which, again is a wrapper around Java
The net result is that the following code is structured just like using the Java API, but from Scala and with far fewer type annotations compared to using the Java API directly from Scala. The difference in type annotation usage will be more obvious when we use a more complicated example. The library comes with a test suite of a few examples that demonstrate these capabilities.
In the above code snippet, we don't have to provide any serdes,
Joined explicitly. They will also not be dependent on any serdes specified in the config - in fact all serdes specified in the config will be ignored by the Scala APIs. All serdes and
Joined will be handled through implicit serdes as discussed later in the document. The complete independence from configuration based serdes is what makes this library completely type-safe - any missing instances of serdes,
Joined will be flagged as a compile time error.
Here's a sample code fragment using the Scala wrapper library. Compare this example to the Scala code for the same example using the Java API directly in Confluent's repository.
One of the common complaints of Scala users with the Java API has been the repetitive usage of the serdes in API invocations. Many of the APIs need to take the serdes through abstractions like
Joined. And the user has to supply them every time through the
with function of these classes.
The library uses the power of Scala implicits to alleviate this concern. As a user you can provide implicit serdes or implicit values of
Produced once and make your code less verbose. In fact you can just have the implicit serdes in scope and the library will make the instances of
Joined available in scope.
The library also bundles all implicit serdes of the commonly used primitive types in a Scala module - so just import the module vals and have all serdes in scope. Similar strategy of modular implicits can be sdopted for any user-defined serdes as well.
Here's an example:
Quite a few things are going on in the above code snippet that may warrant a few lines of elaboration:
- The code snippet does not depend on any config defined serdes. In fact any serde defined as part of the config will be ignored
- All serdes are picked up from the implicits in scope. And
import DefaultSerdes._brings all necessary serdes in scope.
- This is an example of compile time type safety that we don't have in the Java APIs
- The code looks less verbose and more focused towards the actual transformation that it does on the data stream
When the default primitive serdes are not enough and we need to define custom serdes, the usage is exactly the same as above. Just define the implicit serdes and start building the stream transformation. Here's an example with
Scala Version Compatibility
When two versions of Scala are binary compatible, it is safe to compile your project on one Scala version and link against another Scala version at run time(http://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html).
Binary compatibility is a common concern for Scala library authors. Scala releases are always backward and forward binary compatible between minor releases since Scala 2.10.x. This is automatically enforced by use of the Scala Binary Compatibility validation tool (MiMa). However binary compatibility is typically broken across major releases.
Scala major versions 2.11 and 2.12 are not binary compatible due to compiler changes that use several new language features made available in Java 8. Scala 2.13 has not been released yet, but it’s anticipated to be binary incompatible with 2.12. The Scala 2.13 release has a central theme of core library changes which will cause incompatibility across libraries compiled using earlier versions of Scala.
If there’s a desire MiMa could be used as part of the build and release process to manage binary compatibility for kafka-streams-scala releases inline with Apache Kafka’s version policy.
Two library versions are Source Compatible with each other if switching one for the other does not incur any compile errors or unintended behavioral changes (semantic errors)(http://docs.scala-lang.org/overviews/core/binary-compatibility-for-library-authors.html#source-compatibility).
To support multiple major versions of Scala it is necessary to cross build a source compatible project with two or more versions of Scala. This is commonly done between major versions of Scala such as 2.10/2.11 and 2.11/2.12.
Due to fundamental core library changes that will be released in 2.13 (such as the collections redesign effort), it’s anticipated source compatibility will be an issue due to the ubiquitous use of collections libraries. It’s anticipated that Lightbend will release a compatibility library that allows the library author to preserve source compatibility so that managing multiple code branches won’t be necessary. Guides from Lightbend will also be made available to make managing this transition as easy as possible for library authors.
Binary Compatibility for Library Authors
Scala Binary Compatibility validation tool (MiMa)
Scala 2.13 Roadmap
New or Changed Public Interfaces
org.apache.kafka.streams.scala.kstream. Besides the above ones, the library also has several utility abstractions and modules that the user needs to use for proper semantics. These are:
org.apache.kafka.streams.scala.ImplicitConversions:Module that brings into scope the implicit conversions between the Scala and Java classes
org.apache.kafka.streams.scala.DefaultSerdes:Module that brings into scope the implicit values of all primitive serdes
org.apache.kafka.streams.scala.ScalaSerde:Base abstraction that can be used to implement custom serdes in a type safe way
Migration Plan and Compatibility
org.apache.kafka. A PR on Apache Kafka is available. The PR contains the following:
- the library implementation
- changes in
build.gradleto build the library jar
- tests (1 basic test for WordCount, 2 tests demonstrating usage of implicit serdes)