KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord

Posted

技术标签:

【中文标题】KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord【英文标题】:KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord 【发布时间】:2017-01-29 01:59:00 【问题描述】:

我的KafkaProducer 能够使用KafkaAvroSerializer 将对象序列化到我的主题。但是,KafkaConsumer.poll() 返回反序列化的 GenericRecord 而不是我的序列化类。

我的KafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) 
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));

我的卡夫卡消费者

 try (InputStream props = Resources.getResource("consumer.props").openStream()) 
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    
    consumer.subscribe(Arrays.asList(topic));
    try 
      while (true) 
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) 
          continue;
        
        for (ConsumerRecord<CharSequence, MyBean> record : records) 
          MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
          System.out.println("consumer received: " + bean);
        
      

MyBean bean = record.value(); 该行抛出一个强制转换异常,因为它无法将 GenericRecord 强制转换为 MyBean。

我正在使用kafka-client-0.9.0.1kafka-avro-serializer-3.0.0

【问题讨论】:

【参考方案1】:

KafkaAvroDeserializer 支持 SpecificData

默认情况下不启用。要启用它:

properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

KafkaAvroDeserializer 不支持 ReflectData

Confluent 的KafkaAvroDeserializer 不知道如何使用 Avro ReflectData 进行反序列化。我不得不扩展它以支持 Avro ReflectData:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer 

  private Schema readerSchema;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected ReflectKafkaAvroDeserializer(Class<V> type) 
    readerSchema = ReflectData.get().getSchema(type);
  

  @Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchemaIgnored) throws SerializationException 

    if (payload == null) 
      return null;
    

    int schemaId = -1;
    try 
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) 
        throw new SerializationException("Unknown magic byte!");
      

      schemaId = buffer.getInt();
      Schema writerSchema = schemaRegistry.getByID(schemaId);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
     catch (IOException e) 
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
     catch (RestClientException e) 
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    
  

定义一个反序列化为MyBean的自定义反序列化器类:

public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> 
  public MyBeanDeserializer() 
    super(MyBean.class);
  

配置KafkaConsumer 使用自定义解串器类:

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);

【讨论】:

谢谢!我认为生成的 bean 扩展 SpecificRecordBase 并实现 SpecificRecord 那么它与 Avro Reflect Data 有什么关系?我是 Avro 的新手,所以只想更好地理解。 我尝试了您的代码并得到以下异常: 原因:org.apache.avro.AvroTypeException:找到字符串,期望 com.MyBean 在 org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder .java:292) 位于 org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130) 的 org.apache.avro.io.parsing.Parser.advance(Parser.java:88)。 avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:223) at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) at org.apache.avro.generic.GenericDatumReader.read(Gene 除了切换到您提供的新反序列化器外,我保持一切不变。 我不知道您使用的是SpecificData。我更新了我的答案。 properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true) 为我工作。顺便说一句,某处有文档吗?实际上,我一直在反复试验以解决问题。【参考方案2】:

编辑:反映数据支持已合并(见下文)

要添加到 Chin Huang 的答案,为了最少的代码和更好的性能,您应该以这种方式实现它:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class SpecificKafkaAvroDeserializer<V extends SpecificRecordBase> extends AbstractKafkaAvroDeserializer implements Deserializer<V> 
  private final Schema schema;
  private Class<T> type;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected SpecificKafkaAvroDeserializer(Class<T> type, Map<String, ?> props) 
    this.type = type;
    this.schema = ReflectData.get().getSchema(type);
    this.configure(this.deserializerConfig(props));
  

  public void configure(Map<String, ?> configs) 
    this.configure(new KafkaAvroDeserializerConfig(configs));
  

  @Override
  protected T deserialize(
          boolean includeSchemaAndVersion,
          String topic,
          Boolean isKey,
          byte[] payload,
          Schema readerSchemaIgnore) throws SerializationException 

    if (payload == null) 
      return null;
    

    int schemaId = -1;
    try 
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) 
        throw new SerializationException("Unknown magic byte!");
      

      schemaId = buffer.getInt();
      Schema schema = schemaRegistry.getByID(schemaId);

      Schema readerSchema = ReflectData.get().getSchema(type);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      SpecificDatumReader<T> reader = new SpecificDatumReader(schema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
     catch (IOException e) 
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
     catch (RestClientException e) 
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    
  

【讨论】:

感觉V需要重命名为T ReflectData 支持已合并 github.com/confluentinc/schema-registry/pull/1111

以上是关于KafkaAvroDeserializer 不返回 SpecificRecord 但返回 GenericRecord的主要内容,如果未能解决你的问题,请参考以下文章

KafkaAvroDeserializer的例外情况

Apache Camel Kafka 支持 Confluent 模式注册表

Mongoose User.findOne 既不返回错误也不返回成功

移动端 点击返回按钮页面不刷新

vue实现返回上一页面,页面停留在原来位置,不刷新

如何在 Pycharm 中输入提示“不返回的返回类型”