在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”

Posted

技术标签:

【中文标题】在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”【英文标题】:Error "Invalid join condition: table-table joins require to join on the primary key of the right input table" on joining two tables on Kafka ksqlDB 【发布时间】:2021-12-04 11:58:19 【问题描述】:

我需要从其他九个主题的组合中创建一个 Kafka 主题,所有这些主题都是由 Debezium PostgreSQL 源连接器以 AVRO 格式生成的。首先,我正在尝试(到目前为止没有成功)合并来自两个主题的字段。

所以,首先根据“REQUEST”主题创建一个 ksqlDB 表:

ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
         WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');

我觉得一切都很好:

ksql> DESCRIBE TB_REQUEST;

Name                 : TB_REQUEST
 Field       | Type
-----------------------------------------------------------------------------------------------------------------------

 ID          | STRUCT<REQUEST_ID BIGINT> (primary key)

 BEFORE      | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>

 AFTER       | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
 
 SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>

 OP          | VARCHAR(STRING)

 TS_MS       | BIGINT

 TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>

-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

然后我从“EMPLOYEE”主题创建另一个表:

ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
         WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');

再次,一切似乎都很好。

ksql> DESCRIBE TB_EMPLOYEE;

Name                 : TB_EMPLOYEE
 Field       | Type                                                       
-----------------------------------------------------------------------------------------------------------------------
 ID          | STRUCT<EMPLOYEE_ID INTEGER> (primary key)

 BEFORE      | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
 
 AFTER       | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
 
 SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>

 OP          | VARCHAR(STRING)

 TS_MS       | BIGINT

 TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>

-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

但是通过尝试创建我的目标表并按员工 ID 加入以前的表。

ksql> CREATE TABLE REQUEST_EMPLOYEE AS 
         SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
         FROM TB_REQUEST RQ
         JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;

我收到以下错误:

Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
  RQ.ID->REQUEST_ID REQUEST_ID,
  RQ.AFTER->REQUESTER_ID REQUESTER_ID,
  RQ.AFTER->STATUS_ID STATUS_ID,
  EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
  EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;

查看“DESCRIBE TB_EMPLOYEE”命令的输出,在我看来“EM.ID->EMPLOYEE_ID”是正确的选择。我错过了什么?

提前致谢。

PS:ksqlDB 0.21.0 版本

【问题讨论】:

【参考方案1】:

我认为你应该在你的连接语句中至少使用一个行键,在以前的 KsqlDB 版本中,连接表的唯一方法是通过行键,在你当前的版本 0.21.0 中可以使用外键。

检查以下示例:

CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;

其中 u_id 被定义为主键,因此是行键。

CREATE TABLE users (
    u_id VARCHAR PRIMARY KEY
    name VARCHAR
  ) WITH (
    kafka_topic = 'users',
    partitions = 3,
    value_format = 'json'
  );

下面的句子类似

CREATE TABLE orders_with_users AS
    SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;

另一个观察结果是,KsqlDB 将您的 TB_EMPLOYE 的键视为 STRUCT,而不仅仅是整数。 然后等待结构之间的比较。 (使用相同的架构)

然后您可以在创建表格之前执行以下步骤。

CREATE STREAM STREAM_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> KEY)
         WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');

CREATE STREAM STREAM_REKEY_EMPLOYEE 
AS SELECT ID->EMPLOYEE_ID employee_id, * FROM STREAM_EMPLOYEE
PARTITION BY ID->EMPLOYEE_ID
EMIT CHANGES;


CREATE TABLE TB_EMPLOYEE (employee_id PRIMARY KEY)
         WITH (KAFKA_TOPIC='STREAM_REKEY_EMPLOYEE', FORMAT='AVRO');

并使用employee_id字段加入,尽量使用你的主键作为原始类型。

【讨论】:

嘿费利佩,感谢您的回复。该错误是由于尝试使用结构加入而引起的。您关于创建流以平整主键 (CREATE STREAM STREAM_REKEY_EMPLOYEE ...) 的建议起到了作用。我创建了两个“rekeyd”流,每个源一个,基于这些流创建表,并且能够按照最初的预期创建一个连接表。 问题正是关键结构,如果您认为响应很好,请将答案列为有用:),谢谢。 我做到了。但在撰写本文时,我还没有投票权的声誉。 :-(

以上是关于在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”的主要内容,如果未能解决你的问题,请参考以下文章

将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用

KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接

如何从kafka主题为ksqldb创建主题

使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用

Kafka Connect 重新读取整个文件以进行 KSQLDB 调试或 KSQLDB 是不是可以在创建查询后插入所有事件?

如何将 OpenShift 上的 KSQLDB 集群连接到本地 Kerberized Kafka 集群