如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?
Posted
技术标签:
【中文标题】如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?【英文标题】:How to effectively chain groupby queries from flat api data in Kafka Streams? 【发布时间】:2021-08-12 20:20:03 【问题描述】:我有一些来自 API 的随机数据进入 Kafka 主题,如下所示:
"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "$58312.28", "city": "New York City", "state": "New York", "zipCode": "10014"
"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "$65651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"
"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "$21325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"
"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "$81822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"
我能够创建 KStream
对象并观察它们,如下所示:
KStream<byte[], UsedCars> usedCarsInputStream =
builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));
//k, v => year, countof cars in year
KTable<String,Long> yearCount = usedCarsInputStream
.filter((k,v)->v.getYear() > 2010)
.selectKey((k,v) -> v.getVin())
.groupBy((key, value) -> Integer.toString(value.getYear()))
.count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah"));
这当然给了我们按年份分组的记录计数大于 2010 年。但是,我想在下一步中做的,但无法完成的,就是简单地把那些年,如foreach
,并计算每年每种颜色的汽车数量。我尝试在yearCount.toStream()
上写foreach
以进一步处理数据,但没有得到任何结果。
我正在寻找可能如下所示的输出:
"2011": [
"blue": "99",
"green": "243,",
"red": "33"
],
"2012": [
"blue": "74,",
"green": "432,",
"red": "2"
]
【问题讨论】:
【参考方案1】:我相信我可能已经回答了我自己的问题。我欢迎任何其他人对我自己的解决方案发表评论。
我没有意识到您可以对本质上是复合对象的对象执行 GroupBy。在这种情况下,我需要与以下 SQL 语句等效的语句
SELECT year, color, count(*) FROM use_car_colors AS years
GROUP BY year, color
在 Kafka Streams 中,您可以通过创建一个对象来实现这一点——在这种情况下,我创建了一个名为“YearColor”的 POJO 类,其中包含成员 year 和 color——然后在 Kafka Streams 中选择它作为键:
usedCarsInputStream
.selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
.groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
.count()
.toStream()
.peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor()
+ " count: " + ct));
你当然必须为这个对象实现序列化器和反序列化器(我用 YearColorSerdes() 实现了)。我在运行 Kafka Streams 应用程序时的输出为我提供了修改后计数的更新,例如:
year: 2012 color: Maroon count: 2
year: 2013 color: Khaki count: 1
year: 2012 color: Crimson count: 5
year: 2011 color: Pink count: 4
year: 2011 color: Green count: 2
这正是我想要的。
【讨论】:
以上是关于如何有效地链接来自 Kafka Streams 中平面 api 数据的 groupby 查询?的主要内容,如果未能解决你的问题,请参考以下文章
如何限制kafka-streams中的rocksdb内存使用量
KSQL / Kafka Streams可以支持复杂事件处理吗?