...
Code Block |
---|
|
@Test
public void testByteBufferSerializer() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer buffer = ByteBuffer.allocate(7);
buffer.put(bytes);
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
assertArrayEquals(bytes, serializer.serialize(topic, buffer));
}
} |
Executing the above test case will throw the following exception:
Code Block |
---|
|
array lengths differ, expected: <5> but was: <7>
Expected :5
Actual :7
<Click to see difference>org.opentest4j.AssertionFailedError: array lengths differ, expected: <5> but was: <7>
...
at org.apache.kafka.common.serialization.SerializationTest.testByteBufferSerializer(SerializationTest.java:397)
...
at java.util.ArrayList.forEach(ArrayList.java:1259)
...
at java.util.ArrayList.forEach(ArrayList.java:1259)
...
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) |
...
There are no new interfaces and no existing interfaces that will be removed or changed.
Proposed Changes
Change the implement of `ByteBufferSerializer#serialize(String, ByteBuffer)`:
Code Block |
---|
|
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
@Override
public byte[] serialize(String topic, ByteBuffer data) {
if (data == null) {
return null;
}
if (data.hasArray()) {
final byte[] arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}
data.flip();
return Utils.toArray(data);
}
} |
Compatibility, Deprecation, and Migration Plan
- If the old behavior is correct, then the new behavior is also correct.
Test Plan
Code Block |
---|
|
@Test
public void testByteBufferSerializer() {
final byte[] bytes = "Hello".getBytes(UTF_8);
final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes);
final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes);
final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes);
try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) {
assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0));
assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1));
}
} |
...