使用 Apache Beam 反序列化 Kafka AVRO 消息
Posted
技术标签:
【中文标题】使用 Apache Beam 反序列化 Kafka AVRO 消息【英文标题】:Deserialize Kafka AVRO messages using Apache Beam 【发布时间】:2019-07-12 07:58:35 【问题描述】:主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。
我已经能够在简单的场景中使用消息,例如 KV (Long,String),使用类似的东西:
PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long,
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
PCollection<String> output = input.apply(Values.<String>create());
但是当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个需要消耗的 KV(STRING, AVRO)。
我尝试从 AVRO 模式生成 Java 类,然后将它们包含在“应用”中,例如:
PCollection<MyClass> output = input.apply(Values.<MyClass>create());
但这似乎不是正确的方法。
是否有任何人可以指出我的文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam?
我已经更新了我的代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;
public class Main
public static void main(String[] args)
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);
p.run();
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
@DefaultCoder(AvroCoder.class)
public class Myclass
String name;
String age;
Myclass()
Myclass(String n, String a)
this.name= n;
this.age= a;
但我现在收到以下错误
incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >
我一定是导入了错误的序列化程序?
【问题讨论】:
【参考方案1】:我也遇到过同样的问题。在此邮件存档中找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E
在您的情况下,您需要定义自己的Deserializer<MyClass>
,它可以从AbstractKafkaAvroDeserializer
扩展,如下所示。
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass>
@Override
public void configure(Map<String, ?> configs, boolean isKey)
configure(new KafkaAvroDeserializerConfig(configs));
@Override
public MyClass deserialize(String s, byte[] bytes)
return (MyClass) this.deserialize(bytes);
@Override
public void close()
然后将您的 KafkaAvroDeserializer 指定为 ValueDeserializer。
p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );
【讨论】:
我发现从AbstractKafkaAvroDeserializer
扩展在这里并不重要,如果你只是实现接口,为KafkaAvroDeserializer
的具体实例创建一个内部字段并委托给它。在任何情况下,如果您从 KafkaAvroDeserialzer
扩展,您应该能够删除配置和关闭方法覆盖【参考方案2】:
您可以按如下方式使用 KafkaAvroDeserializer:
PCollection<KV<Long,MyClass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
其中 MyClass 是 POJO 类生成的 Avro Schema。
确保您的 POJO 类具有注释 AvroCoder,如下例所示:
@DefaultCoder(AvroCoder.class)
public class MyClass
String name;
String age;
MyClass()
MyClass(String n, String a)
this.name= n;
this.age= a;
【讨论】:
如果来自 io.confluent.kafka.serializers.KafkaAvroDeserializer 的 KafkaAvroDeserializer.class?这就是我目前正在使用的,但它给了我一个错误,因为它期望来自 org.apache.kafka.common.serialization.Deserializer 的反序列化器 是的,它来自 Confluent 包。你得到什么错误?你能粘贴错误堆栈跟踪吗? 对不起,我应该更清楚,我得到一个编译错误,即:Error:(47, 69) java: incompatible types: java.lang.ClassYohei 的回答很好,但我也发现这很管用
import io.confluent.kafka.streams.serdes.avro.SpecificAvroDeserializer;
...
public static class CustomKafkaAvroDeserializer extends SpecificAvroDeserializer<MyCustomClass>
...
.withValueDeserializerAndCoder(CustomKafkaAvroDeserializer.class, AvroCoder.of(MyCustomClass.class))
...
MyCustomClass
是使用 Avro 工具生成的代码。
【讨论】:
【参考方案4】:我今天遇到了类似的问题,并遇到了以下示例,它为我解决了这个问题。
https://github.com/andrewrjones/debezium-kafka-beam-example/blob/master/src/main/java/com/andrewjones/KafkaAvroConsumerExample.java
对我来说缺少的部分是(类)KafkaAvroDeserializer
KafkaIO.<String, MyClass>read()
.withBootstrapServers("kafka:9092")
.withTopic("dbserver1.inventory.customers")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(MyClass.class))
【讨论】:
.class
应该已经返回了一个 Class 对象,所以似乎不需要强制转换...【参考方案5】:
将KafkaIO.<Long, String>read()
更改为KafkaIO.<Long, Object>read()
。
如果你查看KafkaAvroDeserializer
的实现,它实现了Deserializer<Object>
:
public class KafkaAvroDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object>
【讨论】:
我面临同样的错误,这也不能解决问题。我收到此错误:编译失败 [ERROR] /Users/01087872/Documents/fr-det-avro-sample/src/main/java/examples/MyClassConsumer.java:[19,17] 不兼容的类型:推理变量 T 不兼容等式约束 java.lang.Object,examples.MyClass以上是关于使用 Apache Beam 反序列化 Kafka AVRO 消息的主要内容,如果未能解决你的问题,请参考以下文章
使用 flink runner 时如何在 apache Beam 中执行检查点?
Apache-Beam 将序列号添加到 PCollection