...
Change the implement of `ByteBufferSerializer#serialize(String, ByteBuffer)`:
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.serialization;
import org.apache.kafka.common.utils.Utils;
import java.nio.ByteBuffer;
public class ByteBufferSerializer implements Serializer<ByteBuffer> {
@Override
public byte[] serialize(String topic, ByteBuffer data) {
if (data == null) {
return null;
}
if (data.hasArray()) {
final byte[] arr = data.array();
if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
return arr;
}
}
data.flip();
return Utils.toArray(data);
}
} |
...