在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”
Posted
技术标签:
【中文标题】在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”【英文标题】:Error "Invalid join condition: table-table joins require to join on the primary key of the right input table" on joining two tables on Kafka ksqlDB 【发布时间】:2021-12-04 11:58:19 【问题描述】:我需要从其他九个主题的组合中创建一个 Kafka 主题,所有这些主题都是由 Debezium PostgreSQL 源连接器以 AVRO 格式生成的。首先,我正在尝试(到目前为止没有成功)合并来自两个主题的字段。
所以,首先根据“REQUEST”主题创建一个 ksqlDB 表:
ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');
我觉得一切都很好:
ksql> DESCRIBE TB_REQUEST;
Name : TB_REQUEST
Field | Type
-----------------------------------------------------------------------------------------------------------------------
ID | STRUCT<REQUEST_ID BIGINT> (primary key)
BEFORE | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
AFTER | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
SOURCE | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP | VARCHAR(STRING)
TS_MS | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
然后我从“EMPLOYEE”主题创建另一个表:
ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
再次,一切似乎都很好。
ksql> DESCRIBE TB_EMPLOYEE;
Name : TB_EMPLOYEE
Field | Type
-----------------------------------------------------------------------------------------------------------------------
ID | STRUCT<EMPLOYEE_ID INTEGER> (primary key)
BEFORE | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
AFTER | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>
SOURCE | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP | VARCHAR(STRING)
TS_MS | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
但是通过尝试创建我的目标表并按员工 ID 加入以前的表。
ksql> CREATE TABLE REQUEST_EMPLOYEE AS
SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
FROM TB_REQUEST RQ
JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;
我收到以下错误:
Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
RQ.ID->REQUEST_ID REQUEST_ID,
RQ.AFTER->REQUESTER_ID REQUESTER_ID,
RQ.AFTER->STATUS_ID STATUS_ID,
EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;
查看“DESCRIBE TB_EMPLOYEE”命令的输出,在我看来“EM.ID->EMPLOYEE_ID”是正确的选择。我错过了什么?
提前致谢。
PS:ksqlDB 0.21.0 版本
【问题讨论】:
【参考方案1】:我认为你应该在你的连接语句中至少使用一个行键,在以前的 KsqlDB 版本中,连接表的唯一方法是通过行键,在你当前的版本 0.21.0 中可以使用外键。
检查以下示例:
CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;
其中 u_id 被定义为主键,因此是行键。
CREATE TABLE users (
u_id VARCHAR PRIMARY KEY
name VARCHAR
) WITH (
kafka_topic = 'users',
partitions = 3,
value_format = 'json'
);
下面的句子类似
CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;
另一个观察结果是,KsqlDB 将您的 TB_EMPLOYE 的键视为 STRUCT
然后您可以在创建表格之前执行以下步骤。
CREATE STREAM STREAM_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');
CREATE STREAM STREAM_REKEY_EMPLOYEE
AS SELECT ID->EMPLOYEE_ID employee_id, * FROM STREAM_EMPLOYEE
PARTITION BY ID->EMPLOYEE_ID
EMIT CHANGES;
CREATE TABLE TB_EMPLOYEE (employee_id PRIMARY KEY)
WITH (KAFKA_TOPIC='STREAM_REKEY_EMPLOYEE', FORMAT='AVRO');
并使用employee_id字段加入,尽量使用你的主键作为原始类型。
【讨论】:
嘿费利佩,感谢您的回复。该错误是由于尝试使用结构加入而引起的。您关于创建流以平整主键 (CREATE STREAM STREAM_REKEY_EMPLOYEE ...) 的建议起到了作用。我创建了两个“rekeyd”流,每个源一个,基于这些流创建表,并且能够按照最初的预期创建一个连接表。 问题正是关键结构,如果您认为响应很好,请将答案列为有用:),谢谢。 我做到了。但在撰写本文时,我还没有投票权的声誉。 :-(以上是关于在 Kafka ksqlDB 上连接两个表时出现错误“无效的连接条件:表-表连接需要在右输入表的主键上连接”的主要内容,如果未能解决你的问题,请参考以下文章
将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用
KSQLDB - 从 debezium cdc 源连接器获取数据并将 Stream 与表连接
使用 Docker Compose 创建 Kafka-Connect 集群以供 ksqlDB 使用
Kafka Connect 重新读取整个文件以进行 KSQLDB 调试或 KSQLDB 是不是可以在创建查询后插入所有事件?