Apache Kafka分组两次

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Kafka分组两次相关的知识,希望对你有一定的参考价值。

我正在编写一个应用程序,我正在尝试计算每小时访问一页的用户数。我正在尝试过滤到特定事件,按userId和事件小时时间分组,然后按小时分组以获取用户数。但是,在尝试关闭流时,对KTable进行分组会导致过多的CPU烧毁和锁定。有一个更好的方法吗?

    events
   .groupBy(...)
   .aggregate(...)
   .groupBy(...);
   .count();
答案

鉴于上述问题的答案“我只是想在一小时的时间窗口内知道执行特定操作的用户数量”,我建议如下。

假设你有这样的记录:

class ActionRecord {
  String actionType;
  String user;
}

您可以定义这样的聚合类:

class ActionRecordAggregate {
  private Set<String> users = new HashSet<>();

  public void add(ActionRecord rec) {
    users.add(rec.getUser());
  }

  public int count() {
    return users.size();
  }

}

那么你的流媒体应用可以:

  • 接受这些事件
  • 根据事件类型重新设置它们(.map()
  • 按事件类型分组(.groupByKey()
  • 按时间窗口(选择1分钟,但YMMV)
  • 将它们聚合成ActionRecordAggregate
  • 将它们物化为StateStore

所以这看起来像:

stream()
.map((key, val) -> KeyValue.pair(val.actionType, val)) 
.groupByKey() 
.windowedBy(TimeWindows.of(60*1000)) 
.aggregate(
  ActionRecordAggregate::new, 
  (key, value, agg) -> agg.add(value),
  Materialized
      .<String, ActionRecordAggregate, WindowStore<Bytes, byte[]>>as("actionTypeLookup")
      .withValueSerde(getSerdeForActionRecordAggregate())
);

然后,要恢复事件,您可以查询您的状态存储:

ReadOnlyWindowStore<String, ActionRecordAggregate> store = 
  streams.store("actionTypeLookup", QueryableStoreTypes.windowStore());

WindowStoreIterator<ActionRecordAggregate> wIt = 
  store.fetch("actionTypeToGet", startTimestamp, endTimestamp);

int totalCount = 0;
while(wIt.hasNext()) {
  totalCount += wIt.next().count();
}

// totalCount is the number of distinct users in your 
// time interval that raised action type "actionTypeToGet"

希望这可以帮助!

以上是关于Apache Kafka分组两次的主要内容,如果未能解决你的问题,请参考以下文章

Android片段生命周期:onResume调用了两次

按一次返回按钮停留在同一个片段上,如果按两次,它将返回到上一个片段

android片段在方向更改时创建了两次

片段android中的菜单项调用两次

关于kafka更改消费者对应分组下的offset值

使用导航控制器创建两次的片段