flink kafka consumer with avro schema. handling null
Posted connie313
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink kafka consumer with avro schema. handling null相关的知识,希望对你有一定的参考价值。
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private final Class<T> avroType;
private transient DatumReader<T> reader;
private transient BinaryDecoder decoder;
public AvroDeserializationSchema(Class<T> avroType) {
this.avroType = avroType;
}
public T deserialize(byte[] message) {
ensureInitialized();
try {
decoder = DecoderFactory.get().binaryDecoder(message, decoder);
T t = reader.read(null, decoder);
return t;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
private void ensureInitialized() {
if (reader == null) {
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
reader = new SpecificDatumReader<T>(avroType);
} else {
reader = new ReflectDatumReader<T>(avroType);
}
}
}
public boolean isEndOfStream(T nextElement) {
return false;
}
public TypeInformation<T> getProducedType() {
return TypeExtractor.getForClass(avroType);
}
}
https://developer.aliyun.com/ask/131116?spm=a2c6h.13159736
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
以上是关于flink kafka consumer with avro schema. handling null的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/
如何修复:java.lang.OutOfMemoryError: Direct buffer memory in flink kafka consumer