“注册”是 Ksql 中的保留关键字,如果是,我如何选择具有该名称的字段

Posted

技术标签:

【中文标题】“注册”是 Ksql 中的保留关键字,如果是,我如何选择具有该名称的字段【英文标题】:Is "register" a reserved keyword in Ksql and if so how can I select a field with that name 【发布时间】:2019-10-01 14:10:36 【问题描述】:

我正在学习 Confluent 平台(Kafka、Ksql 等)。我正在使用带有 Kafka Connect 的 Debezium 将数据流式传输到 Kafka 主题中。我的数据库表“log”中的一个字段称为“register”,它是添加记录时的时间戳

表日志结构供参考(在源mysql数据库中)如下:

CREATE TABLE `log` (
  `code` varchar(9) NOT NULL,
  `register` datetime NOT NULL,
  `entry` mediumtext NOT NULL,
  PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

我正在使用以下配置将两个数据库中的“日志”表中的数据流式传输到单个 Kafka 主题中,该配置按预期工作。

"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.topicRoute.replacement": "merged.$3",

我正在尝试建立一个 KSQL 流,该流创建一个新键,该键是源数据库(来自 Debezium 生成的元数据)和日志表中的代码字段以及来自桌子。这样做的目的是使派生密钥在发送到接收器时是完全唯一的(当前连接到另一个 MySQL 数据库,该数据库包含一个日志表,其内容应该是两个源数据库日志表的合并副本)

我尝试运行的查询是:

SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;

但是会出现以下错误:

line 1:59: mismatched input 'register' expecting 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting 'INTEGER', 'DATE',
        'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
        'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
        'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
        'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
        QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER
Caused by: org.antlr.v4.runtime.InputMismatchException

我看不到任何地方表明“注册”是某种保留术语。

有人可以帮忙吗?任何替代方案都可以建议一种在转换过程中更改字段名称的方法,请记住我无法展平 Debezium 生成的消息,因为我需要能够获取源数据库名称

【问题讨论】:

好的,我已经和知情的人谈过了,“注册”是一个关键字,所以剩下的问题是是否可以使用 SMT 将字段名称更改为进入 Kafka 的方式来自 Debezium 消息 【参考方案1】:

    是的 REGISTER 是保留字,您应该在 DDL 中避免使用它。您可能可以通过引用它来访问它,值得一试。

    有一个用于删除字段的单一消息转换,但它不适用于嵌套数据。您可以尝试将UnwrapFromEnvelope SMT 与一个组合来重命名该字段。我没有尝试过这个配置,但类似

    "transforms": "unwrap,renameField",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.renameField.renames": "register:notareservedword",
    

【讨论】:

一如既往地感谢罗宾。到目前为止,我一直在避免 UnwrapFromEnvelope 转换,因为它使捕获删除变得很棘手,这是我们需要做的事情(至少我最初的尝试不想工作,我认为我会重新关注那个主题)。

以上是关于“注册”是 Ksql 中的保留关键字,如果是,我如何选择具有该名称的字段的主要内容,如果未能解决你的问题,请参考以下文章

CUDA 中的“注册”关键字

Kafka KSQL入门

KSQL / KStream - 根据生成时间获取偏移量

将多列合并为一个新列,同时保留原始列

将 UDF 方法作为参数传递给 KSQL 中的其他 UDF

kafka sql入门