Complete Confluence Compliant Avro Deserializer that can do Schema Evolution
2 min readSep 25, 2019
I was using Spring Cloud Stream to connect to Kafka topic that holds avro bytes. Even nowadays, Spring Cloud Stream couldn’t make it confluent avro standard. So I have to have a work around.
Here are what I did:
- Use ByteArrayDeserializer as the value Deserializer.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
2. in your listener, use the following class to deserialize the byteArrays.
@StreamListener(ChannelName.INPUT_REAL_TIME_PUSH_NOTIFICATION_CHANNEL)
public void onMessage(byte[] myDto) {
MyDto result = new AvroDeserializer(MyDto.class, schemaRegistry).deserialize("your topic", myDto);
//Do something later
}
Now I have pasted my secret deserializer code here, hope you enjoy it. This code can also support schema evolution.
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import javax.xml.bind.DatatypeConverter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
@Slf4j
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {
protected final Class<T> targetType;
private final CachedSchemaRegistryClient registryClient;
public AvroDeserializer(Class<T> targetType, CachedSchemaRegistryClient registryClient) {
this.targetType = targetType;
this.registryClient = registryClient;
}
@Override
public void close() {
// No-op
}
@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// No-op
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(String topic, byte[] data) {
try {
T result = null;
if (data != null) {
log.debug("data='{}'", DatatypeConverter.printHexBinary(data));
ByteBuffer buffer = this.getByteBuffer(data);
int id = buffer.getInt();
int length = buffer.limit() - 1 - 4;
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);
Schema readerSchema = this.registryClient.getById(id);
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema(), readerSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
result = (T) datumReader.read(null, decoder);
log.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0) {
throw new SerializationException("Unknown magic byte!");
} else {
return buffer;
}
}
}