KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro
Posted
技术标签:
【中文标题】KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro【英文标题】:KafkaAvroSerializer for serializing Avro without schema.registry.url 【发布时间】:2018-01-20 00:03:30 【问题描述】:我是 Kafka 和 Avro 的菜鸟。所以我一直试图让生产者/消费者运行。到目前为止,我已经能够使用以下内容生成和使用简单的字节和字符串: 生产者的配置:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++)
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
producer.close();
现在一切都很好,当我尝试序列化 POJO 时问题就来了。 因此,我能够使用 Avro 提供的实用程序从 POJO 获取 AvroSchema。 对模式进行硬编码,然后尝试创建一个通用记录以通过 KafkaProducer 发送 生产者现在设置为:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
这就是问题所在:当我使用 KafkaAvroSerializer 时,生产者没有出现,原因是: 缺少强制参数:schema.registry.url
我了解了为什么需要这样做,以便我的消费者能够破译生产者发送给我的任何内容。 但是模式不是已经嵌入到 AvroMessage 中了吗? 如果有人可以分享一个使用 KafkaProducer 和 KafkaAvroSerializer 而无需指定 schema.registry.url 的工作示例,那就太好了
也非常感谢有关架构注册表实用程序的任何见解/资源。
谢谢!
【问题讨论】:
你试过spring-kafka avro deserializer 吗? Here's 也是一个教程。 【参考方案1】:正如其他人所指出的,KafkaAvroSerializer 需要 Schema Registry,它是 Confluent 平台的一部分,并且使用需要许可。
使用模式注册表的主要优点是在线上的字节会更小,而不是为每条消息编写带有模式的二进制有效负载。
我写了一个blog post 详细说明优点
【讨论】:
【参考方案2】:您可以创建自定义 Avro 序列化程序,然后即使没有 Schema 注册表,您也可以生成主题记录。查看下面的文章。
https://codenotfound.com/spring-kafka-apache-avro-serializer-deserializer-example.html
这里他们使用 Kafkatemplate 。我试过使用
KafkaProducer<String, User> UserKafkaProducer
工作正常 但是如果你想使用KafkaAvroSerialiser,你需要给Schema registryURL
【讨论】:
【参考方案3】:您始终可以使您的值类手动实现Serialiser<T>
、Deserialiser<T>
(以及用于Kafka Streams 的Serde<T>
)。 Java 类通常是从 Avro 文件生成的,因此直接编辑它不是一个好主意,但是包装可能很冗长但可能的方式。
另一种方法是调整用于 Java 类生成的 Arvo 生成器模板,并自动生成所有这些接口的实现。 Avro maven 和 gradle 插件都支持自定义模板,所以应该很容易配置。
我创建了 https://github.com/artemyarulin/avro-kafka-deserializable,它更改了模板文件和可用于文件生成的简单 CLI 工具
【讨论】:
【参考方案4】:虽然检查的答案都是正确的,但还应该提到可以禁用架构注册。
只需将auto.register.schemas
设置为false
。
【讨论】:
spring.kafka.properties.auto.register.schemas 适用于使用 SpringBoot 的用户。 这根本没有帮助,因为它仍然使用 schemaregistry 来获取架构。【参考方案5】:首先注意:KafkaAvroSerializer
在 vanilla apache kafka 中不提供 - 它由 Confluent Platform 提供。 (https://www.confluent.io/),作为其开源组件的一部分 (http://docs.confluent.io/current/platform.html#confluent-schema-registry)
快速回答:不,如果您使用KafkaAvroSerializer
,您将需要一个模式注册表。在此处查看一些示例:
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
模式注册的基本思想是每个主题都将引用一个 avro 模式(即,您只能发送彼此一致的数据。但是一个模式可以有多个版本,因此您仍然需要识别每个记录的架构)
我们不想像您暗示的那样为每个数据编写架构 - 通常,架构比您的数据大!那会浪费每次读取的时候解析它的时间,也浪费资源(网络、磁盘、cpu)
相反,模式注册表实例将执行绑定avro schema <-> int schemaId
,然后序列化程序将在从注册表获取数据(并将其缓存以供以后使用)之后仅在数据之前写入此 ID。
所以在 kafka 中,您的记录将是 [<id> <bytesavro>]
(以及出于技术原因的魔术字节),这仅是 5 个字节的开销(与您的架构的大小相比)
并且在阅读时,您的消费者会找到与 id 对应的模式,以及与之相关的反序列化器 avro 字节。您可以在 confluent doc 中找到更多方法
如果你真的有一个用途,你想为每条记录编写模式,你将需要一个其他序列化程序(我认为你自己编写,但这很容易,只需重用 https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java 并删除模式注册表部分用模式替换它,阅读相同)。但是,如果您使用 avro,我真的不鼓励这样做 - 一天后,您将需要实现类似 avro 注册表之类的东西来管理版本控制
【讨论】:
IMO 您可以在您的 Maven 存储库中保留向后兼容的模式,并且无需为此保留模式注册表。您避免处理额外的服务,因为您使用代码编译架构。但是,如果您更改架构,则需要重新部署应用程序。 IMO 这是一个公平的成本。以上是关于KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Streams“Consumed.with()”与KafkaAvroDeserializer
在 Haskell 中,为啥没有 TypeClass 用于可以像列表一样的东西?
NoNodeAvailableException:没有节点可用于执行查询