Complete Confluence Compliant Avro Deserializer that can do Schema Evolution

Wu Jiaojiao & Li Yuansan
2 min readSep 25, 2019

I was using Spring Cloud Stream to connect to Kafka topic that holds avro bytes. Even nowadays, Spring Cloud Stream couldn’t make it confluent avro standard. So I have to have a work around.

Here are what I did:

  1. Use ByteArrayDeserializer as the value Deserializer.
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

2. in your listener, use the following class to deserialize the byteArrays.

@StreamListener(ChannelName.INPUT_REAL_TIME_PUSH_NOTIFICATION_CHANNEL)
public void onMessage(byte[] myDto) {
MyDto result = new AvroDeserializer(MyDto.class, schemaRegistry).deserialize("your topic", myDto);
//Do something later
}

Now I have pasted my secret deserializer code here, hope you enjoy it. This code can also support schema evolution.


import
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import javax.xml.bind.DatatypeConverter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;

@Slf4j
public class AvroDeserializer<T extends SpecificRecordBase> implements Deserializer<T> {

protected final Class<T> targetType;
private final CachedSchemaRegistryClient registryClient;

public AvroDeserializer(Class<T> targetType, CachedSchemaRegistryClient registryClient) {
this.targetType = targetType;
this.registryClient = registryClient;
}

@Override
public void close() {
// No-op
}

@Override
public void configure(Map<String, ?> arg0, boolean arg1) {
// No-op
}

@SuppressWarnings("unchecked")
@Override
public T deserialize(String topic, byte[] data) {
try {
T result = null;

if (data != null) {
log.debug("data='{}'", DatatypeConverter.printHexBinary(data));
ByteBuffer buffer = this.getByteBuffer(data);
int id = buffer.getInt();

int length = buffer.limit() - 1 - 4;
byte[] bytes = new byte[length];
buffer.get(bytes, 0, length);

Schema readerSchema = this.registryClient.getById(id);

DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema(), readerSchema);
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);

result = (T) datumReader.read(null, decoder);
log.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}

private ByteBuffer getByteBuffer(byte[] payload) {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != 0) {
throw new SerializationException("Unknown magic byte!");
} else {
return buffer;
}
}
}

--

--