无法从对象生成avro通用记录

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了无法从对象生成avro通用记录相关的知识,希望对你有一定的参考价值。

我正在尝试使用Kafka制作人将avro记录发送到A Kafka主题。我有一个User类,我正在发送该类的对象。如果我使用avroRecord.put();设置每个属性,下面的代码工作正常。但我想要的是从对象创建一个通用记录而不使用avroRecord.put();对于每个属性。

用户类

public class User {
    int id;

    String name;

    public User(int id, String name) {

        super();

        this.id = id;

        this.name = name;

    }

    public int getId() {

        return id;

    }

    public void setId(int id) {

        this.id = id;

    }

    public String getName() {

        return name;

    }

    public void setName(String name) {

        this.name = name;

    }

}

发件人类

import org.apache.avro.Schema;

import org.apache.avro.generic.GenericData;

import org.apache.avro.generic.GenericDatumWriter;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.io.DatumWriter;

import org.apache.avro.io.Encoder;

import org.apache.avro.io.EncoderFactory;

import org.apache.avro.reflect.ReflectData;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerConfig;

import org.apache.kafka.clients.producer.ProducerRecord;



import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.util.Properties;



import vo.User;



public class Sender {

    public static void main(String[] args) {



        User user = new User(10,"testName");

        Schema schema = ReflectData.get().getSchema(user.getClass());

        GenericRecord avroRecord = new GenericData.Record(schema);



        //working fine

        /*avroRecord.put("id", user.getId());

        avroRecord.put("name", user.getName());*/



        //not working

        DatumWriter<Object> datumWriter = new GenericDatumWriter<Object>(schema);

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);

        try {

            datumWriter.write(user, encoder);

            encoder.flush();

        } catch (IOException e1) {

            e1.printStackTrace();

        }



        ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("avrotesttopic1",avroRecord);



        Properties props = new Properties();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);

        props.put("schema.registry.url", "http://127.0.0.1:8081");



        KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);



        try {

            producer.send(record);

            producer.flush();

        } catch (Exception e) {

            e.printStackTrace();

        }



        producer.close();

    }

}

我如何将此对象作为Avro发布到我的Kafka主题?

我已经提到了以下链接

https://github.com/akmalmuqeeth/confluent-kafka-spring-demo/blob/master/src/main/java/ConfluentProducerApp.java

https://findusages.com/search/org.apache.avro.io.DatumWriter/write $ 2〜偏移量= 23

https://www.ctheu.com/2017/03/02/serializing-data-efficiently-with-apache-avro-and-dealing-with-a-schema-registry/

谢谢。

答案

有可能使用ReflectDatumWriter完成你正在尝试的东西,唯一的限制是读取你将需要的数据ReflectDatumReader,并将空构造函数作为你的类的一部分。以下代码正在运行(没有kafka,至少是序列化/反序列化)

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class Test {
    public static void main(String[] args) throws IOException {

        User user = new User(10, "testName");
        Schema schema = ReflectData.get().getSchema(user.getClass());
        GenericRecord avroRecord = new GenericData.Record(schema);

        ReflectDatumWriter<User> datumWriter = new ReflectDatumWriter<User>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder encoder = EncoderFactory.get().binaryEncoder(outputStream, null);
        datumWriter.write(user,  encoder);
        encoder.flush();

        ReflectDatumReader<Object> reader = new ReflectDatumReader<Object>(schema);
        User after =  (User)reader.read(null, DecoderFactory.get().binaryDecoder(outputStream.toByteArray(), null));
        System.out.println(after.getId());
        System.out.println(after.getName());
    }

    public static class User {
        int id;
        String name;

        public User(){

        }

        public User(int id, String name) {
            super();
            this.id = id;
            this.name = name;
        }

        public int getId() {
            return id;
        }

        public void setId(int id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }

}

我建议肯定使用模式注册表和AvroSerializer / AvroDeserializer,或者在最坏的情况下使用基于模式的编译类来确保Kafka中主题级别的兼容性,并确保比反射解决方案更好。

编辑:

如果你想使用KafkaAvroSerializer / KafkaAvroDeserializer,你必须提供一个支持的序列化对象(你可以找到列表here)。正如你所看到的,它期望一个原始类型或IndexedRecord,它的意思是你需要提供一个编译的avro类或GenericRecord来序列化/反序列化,没有办法直接使用你的POJO对象KafkaAvro SerDe实施。

另一个选项是实现你自己的serializer / deserializar来处理我的例子中序列化/序列化的字节数组。

另一答案

您应首先创建avro架构,然后使用avro-toolsavro-maven-plugin从该架构生成Java类。工作示例可以找到here

以上是关于无法从对象生成avro通用记录的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam - 将大查询读取为通用记录(Avro 格式)

无法从 java 中的 AVRO 文件生成 CSV

如何在不为每条记录调用发送方法的情况下在 Kafka Avro 生产者中发送对象的 ArrayList? [复制]

从Java对象创建Avro字符串

片段 TextView 无法从 parcelable 对象更新

从avro架构生成的类的Spark问题