Status

Current state: Accepted

Discussion thread: here

Voting thread: here (full history)

JIRA: KAFKA-14722 - Getting issue details... STATUS

Motivation

Kafka exposes serialization/deserialization classes for multiple types such as ByteArray, Double, Float etc, in 'org.apache.kafka.common.serialization' package which is part of public interface. During the task KAFKA-14491 Victoria added BooleanSerde class, It will be useful to have such class in public package.

Public Interfaces

org/apache/kafka/common/serialization package:

BooleanSerde
   static public final class BooleanSerde extends WrapperSerde<Boolean> {
        public BooleanSerde() {
            super(new BooleanSerializer(), new BooleanDeserializer());
        }
    }
method Boolean()
    /**
     * A serde for {@code Boolean} type.
     */
    static public Serde<Boolean> Boolean() {
        return new BooleanSerde();
    }
serdeFrom method
if (Boolean.class.isAssignableFrom(type)) {
return (Serde<T>) Boolean();
}

method: serdeFrom, exception message
throw new IllegalArgumentException("Unknown class for built-in serializer. Supported types are: " +
    "String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID, Boolean");
BooleanSerializer class
public class BooleanSerializer implements Serializer<Boolean> {
	[...]
    public byte[] serialize(final String topic, final Boolean data) { [...]	}
}
BooleanDeserializer class
public class BooleanDeserializer implements Deserializer<Boolean> {
	[...]
    public Boolean deserialize(final String topic, final byte[] data) { [...] }
}

Proposed Changes

  1. Add Boolean Serde to a org/apache/kafka/common/serialization package
    1. Add static public final class to the Serdes.java, update serdeFrom method, add Boolean() method that returns BooleanSerde();
    2. Create BooleanSerializer class in BooleanSerializer.java file, based on what is in NullableValueAndTimestampSerde.java
    3. Create BooleanDeserializer class in BooleanDeserializer.java file, based on what is in NullableValueAndTimestampSerde.java
  2. Rewrite NullableValueAndTimestampSerde.java file, package: org.apache.kafka.streams.state.internals so that it uses BooleanSerde from common/serialization instead of having own private class.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
    • New Serde for Boolean type.
  • If we are changing behavior how will we phase out the older behavior?
    • Update NullableValueAndTimestampSerde.java file, to use public class for Serde
  • If we need special migration tools, describe them here. 
    • N/A
  • When will we remove the existing behavior?
    • N/A

Test Plan

I think that in that case unit-tests are sufficient, since the proposed change is a new simple feature it doesn't need system tests.

  • No labels