KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro

Posted

技术标签:

【中文标题】KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro【英文标题】:KafkaAvroSerializer for serializing Avro without schema.registry.url 【发布时间】:2018-01-20 00:03:30 【问题描述】:

我是 Kafka 和 Avro 的菜鸟。所以我一直试图让生产者/消费者运行。到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串: 生产者的配置:

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) 
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    
    producer.close();

现在一切都很好,当我尝试序列化 POJO 时问题就来了。 因此,我能够使用 Avro 提供的实用程序从 POJO 获取 AvroSchema。 对模式进行硬编码,然后尝试创建一个通用记录以通过 KafkaProducer 发送 生产者现在设置为:

    Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

这就是问题所在:当我使用 KafkaAvroSerializer 时,生产者没有出现,原因是: 缺少强制参数:schema.registry.url

我了解了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。 但是模式不是已经嵌入到 AvroMessage 中了吗? 如果有人可以分享一个使用 KafkaProducer 和 KafkaAvroSerializer 而无需指定 schema.registry.url 的工作示例,那就太好了

也非常感谢有关架构注册表实用程序的任何见解/资源。

谢谢!

【问题讨论】:

你试过spring-kafka avro deserializer 吗? Here's 也是一个教程。 【参考方案1】:

正如其他人所指出的,KafkaAvroSerializer 需要 Schema Registry,它是 Confluent 平台的一部分,并且使用需要许可。

使用模式注册表的主要优点是在线上的字节会更小,而不是为每条消息编写带有模式的二进制有效负载。

我写了一个blog post 详细说明优点

【讨论】:

【参考方案2】:

您可以创建自定义 Avro 序列化程序,然后即使没有 Schema 注册表,您也可以生成主题记录。查看下面的文章。

https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html

这里他们使用 Kafkatemplate 。我试过使用

KafkaProducer<String, User> UserKafkaProducer

工作正常 但是如果你想使用KafkaAvroSerialiser,你需要给Schema registryURL

【讨论】:

【参考方案3】:

您始终可以使您的值类手动实现Serialiser&lt;T&gt;Deserialiser&lt;T&gt;(以及用于Kafka Streams 的Serde&lt;T&gt;)。 Java 类通常是从 Avro 文件生成的,因此直接编辑它不是一个好主意,但是包装可能很冗长但可能的方式。

另一种方法是调整用于 Java 类生成的 Arvo 生成器模板,并自动生成所有这些接口的实现。 Avro maven 和 gradle 插件都支持自定义模板,所以应该很容易配置。

我创建了 https://github.com/artemyarulin/avro-kafka-deserializable,它更改了模板文件和可用于文件生成的简单 CLI 工具

【讨论】:

【参考方案4】:

虽然检查的答案都是正确的,但还应该提到可以禁用架构注册

只需将auto.register.schemas 设置为false

【讨论】:

spring.kafka.properties.auto.register.schemas 适用于使用 SpringBoot 的用户。 这根本没有帮助,因为它仍然使用 schemaregistry 来获取架构。【参考方案5】:

首先注意:KafkaAvroSerializer 在 vanilla apache kafka 中不提供 - 它由 Confluent Platform 提供。 (https://www.confluent.io/),作为其开源组件的一部分 (http://docs.confluent.io/current/platform.html#confluent-schema-registry)

快速回答:不,如果您使用KafkaAvroSerializer,您将需要一个模式注册表。在此处查看一些示例: http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

模式注册的基本思想是每个主题都将引用一个 avro 模式(即,您只能发送彼此一致的数据。但是一个模式可以有多个版本,因此您仍然需要识别每个记录的架构)

我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!那会浪费每次读取的时候解析它的时间,也浪费资源(网络、磁盘、cpu)

相反,模式注册表实例将执行绑定avro schema &lt;-&gt; int schemaId,然后序列化程序将在从注册表获取数据(并将其缓存以供以后使用)之后仅在数据之前写入此 ID。

所以在 kafka 中,您的记录将是 [&lt;id&gt; &lt;bytesavro&gt;](以及出于技术原因的魔术字节),这仅是 5 个字节的开销(与您的架构的大小相比) 并且在阅读时,您的消费者会找到与 id 对应的模式,以及与之相关的反序列化器 avro 字节。您可以在 confluent doc 中找到更多方法

如果你真的有一个用途,你想为每条记录编写模式,你将需要一个其他序列化程序(我认为你自己编写,但这很容易,只需重用 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java 并删除模式注册表部分用模式替换它,阅读相同)。但是,如果您使用 avro,我真的不鼓励这样做 - 一天后,您将需要实现类似 avro 注册表之类的东西来管理版本控制

【讨论】:

IMO 您可以在您的 Maven 存储库中保留向后兼容的模式,并且无需为此保留模​​式注册表。您避免处理额外的服务,因为您使用代码编译架构。但是,如果您更改架构,则需要重新部署应用程序。 IMO 这是一个公平的成本。

以上是关于KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams“Consumed.with()”与KafkaAvroDeserializer

在 Haskell 中,为啥没有 TypeClass 用于可以像列表一样的东西?

NoNodeAvailableException:没有节点可用于执行查询

Java:没有回报的三元。 (用于方法调用)

没有可用于在 Visual Studio 2017 中运行测试的源

为啥在以下情况下,用于存储选定文件名和文件路径的隐藏输入字段没有在表单上生成?