带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的啥位置?

Posted

技术标签:

【中文标题】带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的啥位置?【英文标题】:Apache Kafka with Avro and Schema Repo - where in the message does the schema Id go?带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的什么位置? 【发布时间】:2015-09-21 03:23:53 【问题描述】:

我想使用 Avro 为我的 Kafka 消息序列化数据,并希望将它与 Avro 架构存储库一起使用,这样我就不必在每条消息中都包含架构。

将 Avro 与 Kafka 一起使用似乎很受欢迎,许多博客/堆栈溢出问题/用户组等都参考了发送带有消息的 Schema Id,但我找不到应该去哪里的实际示例。

我认为它应该放在 Kafka 消息头的某个地方,但我找不到明显的位置。如果它在 Avro 消息中,则必须根据模式对其进行解码以获取消息内容并显示您需要解码的模式,这有明显的问题。

我正在使用 C# 客户端,但任何语言的示例都很棒。消息类具有以下字段:

public MessageMetadata Meta  get; set; 
public byte MagicNumber  get; set; 
public byte Attribute  get; set; 
public byte[] Key  get; set; 
public byte[] Value  get; set; 

但这些似乎都不正确。 MessageMetaData 只有 Offset 和 PartitionId。

那么,Avro Schema Id 应该去哪里?

【问题讨论】:

【参考方案1】:

schema id 实际上是在 avro 消息本身中编码的。看看this,看看编码器/解码器是如何实现的。

一般来说,当您向 Kafka 发送 Avro 消息时会发生什么:

    编码器从要编码的对象中获取架构。 编码器向架构注册表询问此架构的 ID。如果架构已注册,您将获得一个现有 ID,否则 - 注册表将注册架构并返回新 ID。 对象编码如下:[magic byte][schema id][actual message] 其中magic byte 只是一个0x0 字节,用于区分那种消息,schema id 是一个 4 字节的整数值其余的是实际编码的消息。

当您将消息解码回来时,会发生以下情况:

    解码器读取第一个字节并确保它是0x0。 解码器读取接下来的 4 个字节并将它们转换为整数值。这就是架构 ID 的解码方式。 现在,当解码器具有模式 id 时,它可能会向模式注册表询问该 id 的实际模式。瞧!

如果您的密钥是 Avro 编码的,那么您的密钥将采用上述格式。这同样适用于价值。这样,您的键和值可能都是 Avro 值并使用不同的架构。

编辑回答评论中的问题:

实际的模式存储在模式存储库中(实际上是模式存储库的全部点 - 存储模式:))。 Avro 对象容器文件格式与上述格式无关。 KafkaAvroEncoder/Decoder 使用略有不同的消息格式(但实际消息的编码方式肯定完全相同)。

这些格式之间的主要区别在于 Object Container Files 携带实际的 schema,并且可能包含与该 schema 对应的多条消息,而上述格式仅携带 schema id 和一个与该 schema 对应的消息。

传递 object-container-file-encoded 消息可能不太容易遵循/维护,因为一个 Kafka 消息将包含多个 Avro 消息。或者您可以确保一条 Kafka 消息仅包含一条 Avro 消息,但这会导致每条消息都携带架构。

Avro 架构可能非常大(我见过 600 KB 甚至更多的架构),并且在每条消息中携带架构会非常昂贵且浪费,因此架构存储库开始发挥作用 - 架构仅获取一次并且在本地缓存,所有其他查找只是快速的地图查找。

【讨论】:

嗨 serejja,你知道编码方案的任何地方吗? avro.apache.org/docs/1.7.7/spec.html 的规范讨论了包含完整架构的对象容器文件,但我认为这与您描述的不同。 谢谢@serejja,我想我的问题更像是 Confluent 的人是如何决定使用 [magic byte][schema id][actual message] 作为消息格式的?是他们定义的,还是在其他地方指定的? 嗨@serejja,你有没有遇到过不同的lib(哪个更流行)来处理这个问题?我对github.com/linkedin/camus/tree/master/camus-kafka-coders/src/… 做了一个快速回顾,t's 似乎是一个有趣的来源, 是的,我知道这个库,但是我知道无法将 Camus 与 Confluent Schema Registry 集成 感谢@serejja 的澄清。虽然在测试模式注册表时,我发现了一个奇怪的行为。如果将相同的消息发送到两个不同的主题,则为这两个主题分别注册模式。我期望架构在多个主题中是相同的。

以上是关于带有 Avro 和 Schema Repo 的 Apache Kafka - 模式 ID 在消息中的啥位置?的主要内容,如果未能解决你的问题,请参考以下文章

Avro Schema Evolution with GenericData.Record - Mapreduce 过程

KafkaAvroSerializer 用于在没有 schema.registry.url 的情况下序列化 Avro

如何在 Spark 中将 Avro Schema 对象转换为 StructType

序列化avro schema

json CWAAS请求的Avro Schema

AVRO schema 的使用方法