如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?

Posted

技术标签:

【中文标题】如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?【英文标题】:How to effectively chain groupby queries from flat api data in Kafka Streams? 【发布时间】:2021-08-12 20:20:03 【问题描述】:

我有一些来自 API 的随机数据进入 Kafka 主题,如下所示:

"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "$58312.28", "city": "New York City", "state": "New York", "zipCode": "10014"
"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "$65651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"
"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "$21325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"
"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "$81822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"

我能够创建 KStream 对象并观察它们,如下所示:

KStream<byte[], UsedCars> usedCarsInputStream = 
            builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));

            //k, v => year, countof cars in year
            KTable<String,Long> yearCount = usedCarsInputStream
                .filter((k,v)->v.getYear() > 2010)
                .selectKey((k,v) -> v.getVin())
                .groupBy((key, value) -> Integer.toString(value.getYear()))
                .count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah")); 

这当然给了我们按年份分组的记录计数大于 2010 年。但是,我想在下一步中做的,但无法完成的,就是简单地把那些年,如foreach,并计算每年每种颜色的汽车数量。我尝试在yearCount.toStream() 上写foreach 以进一步处理数据,但没有得到任何结果。

我正在寻找可能如下所示的输出:


  "2011": [
    
      "blue": "99",
      "green": "243,",
      "red": "33"
    
  ],
  "2012": [
    
      "blue": "74,",
      "green": "432,",
      "red": "2"
    
  ]

【问题讨论】:

【参考方案1】:

我相信我可能已经回答了我自己的问题。我欢迎任何其他人对我自己的解决方案发表评论。

我没有意识到您可以对本质上是复合对象的对象执行 GroupBy。在这种情况下,我需要与以下 SQL 语句等效的语句

SELECT   year, color, count(*) FROM use_car_colors AS years 
GROUP BY year, color

在 Kafka Streams 中,您可以通过创建一个对象来实现这一点——在这种情况下,我创建了一个名为“YearColor”的 POJO 类,其中包含成员 year 和 color——然后在 Kafka Streams 中选择它作为键:

usedCarsInputStream
            .selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
            .groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
            .count()
            .toStream()
            .peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor() 
            + " count: " + ct));

你当然必须为这个对象实现序列化器和反序列化器(我用 YearColorSerdes() 实现了)。我在运行 Kafka Streams 应用程序时的输出为我提供了修改后计数的更新,例如:

year: 2012 color: Maroon count: 2
year: 2013 color: Khaki count: 1
year: 2012 color: Crimson count: 5
year: 2011 color: Pink count: 4
year: 2011 color: Green count: 2

这正是我想要的。

【讨论】:

以上是关于如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams 是不是适合触发记录的批处理?

如何限制kafka-streams中的rocksdb内存使用量

Kafka Streams 容错与并行偏移管理

KSQL / Kafka Streams可以支持复杂事件处理吗?

Kafka Streams应用程序在kafka服务器上打开了太多文件

合并多个相同的 Kafka Streams 主题