如何从 Debezium 创建的 avro 消息中获取字段?
Posted
技术标签:
【中文标题】如何从 Debezium 创建的 avro 消息中获取字段?【英文标题】:How to get field from avro message created by Debezium? 【发布时间】:2020-05-22 20:16:35 【问题描述】:我想根据 ts_ms 时间过滤我的消息。问题是我无法从 avro 消息中获取 ts_ms。这是我精简的 avro .avsc 文件:
"type": "record",
"name": "Envelope",
"namespace": "mysql.company.scores",
"fields": [
"name": "before",
"type": [
"null",
"type": "record",
"name": "Value",
"fields": [
<Some fields based on scores table>
],
"connect.name": "mysql.company.scores.Value"
],
"default": null
,
"name": "after",
"type": [
"null",
"Value"
],
"default": null
,
"name": "source",
"type":
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.mysql",
"fields": [
"name": "version",
"type": [
"null",
"string"
],
"default": null
,
"name": "connector",
"type": [
"null",
"string"
],
"default": null
,
"name": "name",
"type": "string"
,
"name": "server_id",
"type": "long"
,
"name": "ts_sec",
"type": "long"
,
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
,
"name": "file",
"type": "string"
,
"name": "pos",
"type": "long"
,
"name": "row",
"type": "int"
,
"name": "snapshot",
"type": [
"type": "boolean",
"connect.default": false
,
"null"
],
"default": false
,
"name": "thread",
"type": [
"null",
"long"
],
"default": null
,
"name": "db",
"type": [
"null",
"string"
],
"default": null
,
"name": "table",
"type": [
"null",
"string"
],
"default": null
,
"name": "query",
"type": [
"null",
"string"
],
"default": null
],
"connect.name": "io.debezium.connector.mysql.Source"
,
"name": "op",
"type": "string"
,
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
],
"connect.name": "mysql.company.scores.Envelope"
我可以在之前或之后访问,但是当我可以使用 getTs_ms 进行以下方法时,我得到符号找不到方法:
private boolean isRecordNew(mysql.company.scores.Envelope value)
return value.getTs_ms() > 1580988600000L;
这是我的 serde 类的相关部分:
public static Serde<mysql.company.scores.Envelope> getEnvelopeSerde()
SpecificAvroSerde<mysql.company.scores.Envelope> scoreSerde = new SpecificAvroSerde();
scoreSerde.configure(
Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl), false);
return scoreSerde;
我是否可以使用相同的 serde 类访问 ts_ms 字段,或者我应该更改它以使其包含在值中?
【问题讨论】:
为什么要创建自己的 serde?span> 为什么不能看一下生成的信封的Java类,看看字段或方法在哪里? @cricket_007 奇怪的是它曾经将它生成为 ts_ms 但现在它是 tsMs 【参考方案1】:正如@cricket_007 在评论中提到的,我查看了生成的类,该字段名为getTsMs()
,并通过使用此方法解决了问题。
【讨论】:
以上是关于如何从 Debezium 创建的 avro 消息中获取字段?的主要内容,如果未能解决你的问题,请参考以下文章
使用带有 Avro 序列化的 Debezium mongodb CDC 创建的模式太多