无法从对象生成avro通用记录
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了无法从对象生成avro通用记录相关的知识,希望对你有一定的参考价值。
我正在尝试使用Kafka制作人将avro记录发送到A Kafka主题。我有一个User类,我正在发送该类的对象。如果我使用avroRecord.put();
设置每个属性,下面的代码工作正常。但我想要的是从对象创建一个通用记录而不使用avroRecord.put();对于每个属性。
用户类
public class User {
int id;
String name;
public User(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
发件人类
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Properties;
import vo.User;
public class Sender {
public static void main(String[] args) {
User user = new User(10,"testName");
Schema schema = ReflectData.get().getSchema(user.getClass());
GenericRecord avroRecord = new GenericData.Record(schema);
//working fine
/*avroRecord.put("id", user.getId());
avroRecord.put("name", user.getName());*/
//not working
DatumWriter<Object> datumWriter = new GenericDatumWriter<Object>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
try {
datumWriter.write(user, encoder);
encoder.flush();
} catch (IOException e1) {
e1.printStackTrace();
}
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("avrotesttopic1",avroRecord);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://127.0.0.1:8081");
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
try {
producer.send(record);
producer.flush();
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
我如何将此对象作为Avro发布到我的Kafka主题?
我已经提到了以下链接
https://findusages.com/search/org.apache.avro.io.DatumWriter/write $ 2〜偏移量= 23
谢谢。
有可能使用ReflectDatumWriter
完成你正在尝试的东西,唯一的限制是读取你将需要的数据ReflectDatumReader
,并将空构造函数作为你的类的一部分。以下代码正在运行(没有kafka,至少是序列化/反序列化)
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class Test {
public static void main(String[] args) throws IOException {
User user = new User(10, "testName");
Schema schema = ReflectData.get().getSchema(user.getClass());
GenericRecord avroRecord = new GenericData.Record(schema);
ReflectDatumWriter<User> datumWriter = new ReflectDatumWriter<User>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
datumWriter.write(user, encoder);
encoder.flush();
ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(schema);
User after = (User)reader.read(null, DecoderFactory.get().binaryDecoder(outputStream.toByteArray(), null));
System.out.println(after.getId());
System.out.println(after.getName());
}
public static class User {
int id;
String name;
public User(){
}
public User(int id, String name) {
super();
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
我建议肯定使用模式注册表和AvroSerializer / AvroDeserializer,或者在最坏的情况下使用基于模式的编译类来确保Kafka中主题级别的兼容性,并确保比反射解决方案更好。
编辑:
如果你想使用KafkaAvroSerializer
/ KafkaAvroDeserializer
,你必须提供一个支持的序列化对象(你可以找到列表here)。正如你所看到的,它期望一个原始类型或IndexedRecord
,它的意思是你需要提供一个编译的avro类或GenericRecord
来序列化/反序列化,没有办法直接使用你的POJO对象KafkaAvro SerDe实施。
另一个选项是实现你自己的serializer / deserializar来处理我的例子中序列化/序列化的字节数组。
您应首先创建avro架构,然后使用avro-tools
或avro-maven-plugin
从该架构生成Java类。工作示例可以找到here
以上是关于无法从对象生成avro通用记录的主要内容,如果未能解决你的问题,请参考以下文章
Apache Beam - 将大查询读取为通用记录(Avro 格式)
如何在不为每条记录调用发送方法的情况下在 Kafka Avro 生产者中发送对象的 ArrayList? [复制]