flink kafka consumer with avro schema. handling null

Posted connie313

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink kafka consumer with avro schema. handling null相关的知识,希望对你有一定的参考价值。

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }


    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
 } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }


    public boolean isEndOfStream(T nextElement) {
        return false;
    }


    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}




https://developer.aliyun.com/ask/131116?spm=a2c6h.13159736


https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html

以上是关于flink kafka consumer with avro schema. handling null的主要内容,如果未能解决你的问题,请参考以下文章

Flink实战系列Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/

Kafka-Consumer高CPU问题分析

flink源码分析之kafka consumer的执行流程

如何修复:java.lang.OutOfMemoryError: Direct buffer memory in flink kafka consumer

Kafka Streams“Consumed.with()”与KafkaAvroDeserializer

Flink kafka producer with transaction support