KSQL KTabke+KTable Join重复结果异常。
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KSQL KTabke+KTable Join重复结果异常。相关的知识,希望对你有一定的参考价值。
我试着将ktable和ktable进行内部连接。
a 和 b 表。
create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');
内部连接 a 和 b 表按r键。
create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;
1) 用例:通过慢速模式插入新数据
ksql> insert into a_table values('1','1', 'timeA');
--wait 5 second;
ksql> insert into b_table values('1','1', 'timeB');
select * from ab_table emit changes;
--返回1行结果
print AB_TABLE from beginning;
--返回1行结果
2) 用例.通过快速模式插入新数据
ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');
ksql> print a from beginning;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}
ksql> print b from beginning;
Key format: KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}
select * from ab_table emit changes;
--返回1行结果
print AB_TABLE from beginning;
--返回2行结果
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
什么是地狱?为什么在第二个用例中,我的主题有两行重复?
更新主题信息
name : B_TABLE
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
R | VARCHAR(STRING)
TIME | VARCHAR(STRING)
name : A_TABLE
Field | Type
-------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
R | VARCHAR(STRING)
TIME | VARCHAR(STRING)
Name : AB_TABLE
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
A_TABLE_ROWTIME | BIGINT
A_TABLE_ROWKEY | VARCHAR(STRING)
A_TABLE_R | VARCHAR(STRING)
A_TABLE_TIME | VARCHAR(STRING)
B_TABLE_ROWTIME | BIGINT
B_TABLE_ROWKEY | VARCHAR(STRING)
B_TABLE_R | VARCHAR(STRING)
B_TABLE_TIME | VARCHAR(STRING)
topic "a" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "b" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "AB_TABLE" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
搞清楚了这是怎么回事了。是跟缓冲有关。
默认情况下,ksqlDB对两个源表changelogs的输入进行缓冲,即topic? a
和 b
. (这种缓冲可以很有用,可以将报告同一键变化的所有几条消息压缩成一条输出)。
当同时向两个表发射更新时,缓冲意味着当缓冲刷新时,两个表都会被填充。由于表-表连接的两边都会产生输出,两个输入事件相互匹配,从而产生两个输出到topic AB_TABLE
.
PRINT AB_TABLE
正确地显示了变更日志中的两行。
但是: SELECT * FROM AB_TABLE EMIT CHANGES
也在对输入进行缓冲,这种缓冲将两个变化压缩为一个输出。
缓冲可以通过 cache.max.bytes.buffering
. 例如,你可以用关闭缓冲功能。
-- turn off buffering:
SET 'cache.max.bytes.buffering' = 0;
我又运行了一遍你的例子,在上面运行后,只有一行在 AB_TABLE
题。
有人会说,不管是否有任何缓冲,表-表连接的正确输出都只有一行。毕竟,处理的第一行不应该找到匹配,第二行应该找到匹配。 如果你对此有强烈的感觉,那么请在Github上提出一个bug。
以上是关于KSQL KTabke+KTable Join重复结果异常。的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore
Streaming SQL for Apache Kafka
当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息