带有 JSON 格式消息的 KSQL EXTRACTJSONFIELD 返回 null

Posted

技术标签:

【中文标题】带有 JSON 格式消息的 KSQL EXTRACTJSONFIELD 返回 null【英文标题】:KSQL EXTRACTJSONFIELD with JSON formatted messages returns null 【发布时间】:2018-09-26 18:18:01 【问题描述】:

使用 KSQL-CLI,带有一个 kafka 主题,其消息是 JSON 对象。我希望在不声明详尽的 STRUCT 或 MAP 字段定义的情况下提取诸如 obj.updaterId 之类的字段。


  "id": "5ba8f7e6b93c7964efb00f48",
  "source": "ShareService",
  "obj": 
    "updaterId": "systems@test.com",
    "desc": "foobar",
    "name": "com.test.auto.sensor"
  

我可以通过多种方式成功创建 Stream,最简单的是:

CREATE STREAM objs1 (obj VARCHAR) WITH (kafka_topic='json-topic', value_format='JSON');

简单的select按预期工作,可以看到obj的内容...

SELECT *  FROM objs1;

1537804190394 | "5ba8f7e6b93c7964efb00f48" | name=com.test.auto.sensor, updaterId=systems@test.com, desc=foobar

这里不起作用的是使用 EXTRACTJSONFIELD 从 obj 中提取 JSON 字段的任何尝试。对***对象和嵌套对象的响应都是“null”。

SELECT EXTRACTJSONFIELD(obj, '$.updaterId') AS updater FROM objs1;

the ksql documentation 中有一条注释说,如果数据是 STRING 列中的实际对象,我可以改用 STRUCT。它并没有说我必须使用 STRUCT。

顺便说一句,使用 STRUCT 确实有效,但我对 EXTRACTJSONFIELD 很感兴趣,因为我的消息的深层结构会有所不同。换句话说,如果消息不包含深层结构,有时会出现空响应。

作品:

CREATE STREAM objs1 (obj STRUCT<updaterId VARCHAR>) WITH (kafka_topic='json-topic', value_format='JSON');
SELECT OBJ->updaterId AS updater FROM OBJS1;

我发誓我在其他人的问题中看到了其他似乎适用于类似安排的示例。我错过了什么?

注意:我已经为这篇文章简化了我的 JSON。它更大且更嵌套 IRL,但我相信这个更简单的示例是准确的。

OSX 上的 KSQL 版本 5.0.0。

【问题讨论】:

【参考方案1】:

在最新版本中,JSON 解析器 ksql issues #1562 似乎发生了重大变化,该变化从 VARCHAR 列中的 JSON 内容中去除了引号(正如您从我的示例中看到的那样),这导致 JSON 解析器失败找到该字段名称。

该问题建议使用 STRUCT 而不是 EXTRACTJSONFIELD(与我上面的工作示例一样)。

这似乎不适合我的用例,因为我的深层字段名称可能会改变。将对此进行更多研究并更新。

【讨论】:

嗨蒂姆,你是对的,我认为你遇到了这个问题。让我们在 github 问题中跟踪它。【参考方案2】:

ksql issues #1562 已解决,您应该可以像以前一样使用EXTRACTJSONFIELD 功能。请注意,您现在需要使用 master 的最新版本才能拥有此功能。

【讨论】:

以上是关于带有 JSON 格式消息的 KSQL EXTRACTJSONFIELD 返回 null的主要内容,如果未能解决你的问题,请参考以下文章

带有 json 正文消息的 node-xmpp-server

Kafka kSQL sql查询

“注册”是 Ksql 中的保留关键字,如果是,我如何选择具有该名称的字段

KSQL KTabke+KTable Join重复结果异常。

使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题

JAXB 映射到 JSON