KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord
Posted
技术标签:
【中文标题】KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord【英文标题】:KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord 【发布时间】:2017-01-29 01:59:00 【问题描述】:我的KafkaProducer
能够使用KafkaAvroSerializer
将对象序列化到我的主题。但是,KafkaConsumer.poll()
返回反序列化的 GenericRecord
而不是我的序列化类。
我的KafkaProducer
KafkaProducer<CharSequence, MyBean> producer;
try (InputStream props = Resources.getResource("producer.props").openStream())
Properties properties = new Properties();
properties.load(props);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
MyBean bean = new MyBean();
producer = new KafkaProducer<>(properties);
producer.send(new ProducerRecord<>(topic, bean.getId(), bean));
我的卡夫卡消费者
try (InputStream props = Resources.getResource("consumer.props").openStream())
properties.load(props);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
properties.put("schema.registry.url", "http://localhost:8081");
consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
try
while (true)
ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
if (records.isEmpty())
continue;
for (ConsumerRecord<CharSequence, MyBean> record : records)
MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
System.out.println("consumer received: " + bean);
MyBean bean = record.value();
该行抛出一个强制转换异常,因为它无法将 GenericRecord 强制转换为 MyBean。
我正在使用kafka-client-0.9.0.1
、kafka-avro-serializer-3.0.0
。
【问题讨论】:
【参考方案1】:KafkaAvroDeserializer 支持 SpecificData
默认情况下不启用。要启用它:
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
KafkaAvroDeserializer 不支持 ReflectData
Confluent 的KafkaAvroDeserializer
不知道如何使用 Avro ReflectData 进行反序列化。我不得不扩展它以支持 Avro ReflectData:
/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer
private Schema readerSchema;
private DecoderFactory decoderFactory = DecoderFactory.get();
protected ReflectKafkaAvroDeserializer(Class<V> type)
readerSchema = ReflectData.get().getSchema(type);
@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnored) throws SerializationException
if (payload == null)
return null;
int schemaId = -1;
try
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE)
throw new SerializationException("Unknown magic byte!");
schemaId = buffer.getInt();
Schema writerSchema = schemaRegistry.getByID(schemaId);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
catch (IOException e)
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
catch (RestClientException e)
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
定义一个反序列化为MyBean
的自定义反序列化器类:
public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean>
public MyBeanDeserializer()
super(MyBean.class);
配置KafkaConsumer
使用自定义解串器类:
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);
【讨论】:
谢谢!我认为生成的 bean 扩展SpecificRecordBase
并实现 SpecificRecord
那么它与 Avro Reflect Data 有什么关系?我是 Avro 的新手,所以只想更好地理解。
我尝试了您的代码并得到以下异常: 原因:org.apache.avro.AvroTypeException:找到字符串,期望 com.MyBean 在 org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder .java:292) 位于 org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130) 的 org.apache.avro.io.parsing.Parser.advance(Parser.java:88)。 avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:223) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.read(Gene
除了切换到您提供的新反序列化器外,我保持一切不变。
我不知道您使用的是SpecificData。我更新了我的答案。
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true) 为我工作。顺便说一句,某处有文档吗?实际上,我一直在反复试验以解决问题。【参考方案2】:
编辑:反映数据支持已合并(见下文)
要添加到 Chin Huang 的答案,为了最少的代码和更好的性能,您应该以这种方式实现它:
/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class SpecificKafkaAvroDeserializer<V extends SpecificRecordBase> extends AbstractKafkaAvroDeserializer implements Deserializer<V>
private final Schema schema;
private Class<T> type;
private DecoderFactory decoderFactory = DecoderFactory.get();
protected SpecificKafkaAvroDeserializer(Class<T> type, Map<String, ?> props)
this.type = type;
this.schema = ReflectData.get().getSchema(type);
this.configure(this.deserializerConfig(props));
public void configure(Map<String, ?> configs)
this.configure(new KafkaAvroDeserializerConfig(configs));
@Override
protected T deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnore) throws SerializationException
if (payload == null)
return null;
int schemaId = -1;
try
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE)
throw new SerializationException("Unknown magic byte!");
schemaId = buffer.getInt();
Schema schema = schemaRegistry.getByID(schemaId);
Schema readerSchema = ReflectData.get().getSchema(type);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
SpecificDatumReader<T> reader = new SpecificDatumReader(schema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
catch (IOException e)
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
catch (RestClientException e)
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
【讨论】:
感觉V需要重命名为T ReflectData 支持已合并 github.com/confluentinc/schema-registry/pull/1111以上是关于KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord的主要内容,如果未能解决你的问题,请参考以下文章
Apache Camel Kafka 支持 Confluent 模式注册表