KSQL KTabke+KTable Join重复结果异常。

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了KSQL KTabke+KTable Join重复结果异常。相关的知识,希望对你有一定的参考价值。

我试着将ktable和ktable进行内部连接。

ab 表。

 create table a_table(r string, time string) with (Kafka_topic='a', Key='r', Value_format='json');
 create table b_table(r string, time string) with (Kafka_topic='b', Key='r', Value_format='json');

内部连接 ab 表按r键。

create table ab_table as select * from a_table inner join b_table on a_table.r = b_table.r emit changes;

1) 用例:通过慢速模式插入新数据

 ksql> insert into a_table values('1','1', 'timeA');
 --wait 5 second;
 ksql> insert into b_table values('1','1', 'timeB');

select * from ab_table emit changes; --返回1行结果

print AB_TABLE from beginning; --返回1行结果

2) 用例.通过快速模式插入新数据

 ksql> insert into a_table values('2','2', 'timeA');insert into b_table values('2','2', 'timeB');

  ksql>  print a from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeA"}

   ksql> print b from beginning;
    Key format: KAFKA_STRING
    Value format: JSON or KAFKA_STRING
    rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"R":"2","TIME":"timeB"}

select * from ab_table emit changes; --返回1行结果

print AB_TABLE from beginning; --返回2行结果

rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246657,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246657,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}
rowtime: 5/23/20 4:44:06 PM UTC, key: 2, value: {"A_TABLE_ROWTIME":1590252246680,"A_TABLE_ROWKEY":"2","A_TABLE_R":"2","A_TABLE_TIME":"timeA","B_TABLE_ROWTIME":1590252246680,"B_TABLE_ROWKEY":"2","
B_TABLE_R":"2","B_TABLE_TIME":"timeB"}

什么是地狱?为什么在第二个用例中,我的主题有两行重复?

更新主题信息

    name                 : B_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    name                 : A_TABLE
     Field   | Type                      
    -------------------------------------
     ROWTIME | BIGINT           (system) 
     ROWKEY  | VARCHAR(STRING)  (system) 
     R       | VARCHAR(STRING)           
     TIME    | VARCHAR(STRING)  

    Name                 : AB_TABLE
     Field           | Type                      
    ---------------------------------------------
     ROWTIME         | BIGINT           (system) 
     ROWKEY          | VARCHAR(STRING)  (system) 
     A_TABLE_ROWTIME | BIGINT                    
     A_TABLE_ROWKEY  | VARCHAR(STRING)           
     A_TABLE_R       | VARCHAR(STRING)           
     A_TABLE_TIME    | VARCHAR(STRING)           
     B_TABLE_ROWTIME | BIGINT                    
     B_TABLE_ROWKEY  | VARCHAR(STRING)           
     B_TABLE_R       | VARCHAR(STRING)           
     B_TABLE_TIME    | VARCHAR(STRING)     


topic "a" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

topic "b" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

 topic "AB_TABLE" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1
答案

搞清楚了这是怎么回事了。是跟缓冲有关。

默认情况下,ksqlDB对两个源表changelogs的输入进行缓冲,即topic? ab. (这种缓冲可以很有用,可以将报告同一键变化的所有几条消息压缩成一条输出)。

当同时向两个表发射更新时,缓冲意味着当缓冲刷新时,两个表都会被填充。由于表-表连接的两边都会产生输出,两个输入事件相互匹配,从而产生两个输出到topic AB_TABLE.

PRINT AB_TABLE 正确地显示了变更日志中的两行。

但是: SELECT * FROM AB_TABLE EMIT CHANGES 也在对输入进行缓冲,这种缓冲将两个变化压缩为一个输出。

缓冲可以通过 cache.max.bytes.buffering. 例如,你可以用关闭缓冲功能。

-- turn off buffering:
SET 'cache.max.bytes.buffering' = 0;

我又运行了一遍你的例子,在上面运行后,只有一行在 AB_TABLE 题。

有人会说,不管是否有任何缓冲,表-表连接的正确输出都只有一行。毕竟,处理的第一行不应该找到匹配,第二行应该找到匹配。 如果你对此有强烈的感觉,那么请在Github上提出一个bug。

以上是关于KSQL KTabke+KTable Join重复结果异常。的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Ktable 还流式传输重复更新

Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore

Streaming SQL for Apache Kafka

当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息

Spark SQL 中的 where 子句与 join 子句

可以将 Kafka Streams 配置为等待 KTable 加载吗?