flink计数不同的问题

Posted

技术标签:

【中文标题】flink计数不同的问题【英文标题】:flink count distinct issue 【发布时间】:2020-03-09 11:31:39 【问题描述】:

现在我们使用翻转窗口来计算不同的。我们遇到的问题是,如果我们将翻滚窗口从一天延长到一个月,我们就无法获得现在不同的数字。这意味着如果我们将滚动窗口设置为 1 个月,我们得到的数字是每个月的第一天。我现在如何获取当前的不同计数(现在是 3 月 9 日。)?

package flink.trigger;

import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.text.SimpleDateFormat;
import java.util.Date;

public class CustomCountDistinctTigger extends Trigger<Object, TimeWindow> 

    private final ReducingStateDescriptor<Long> timeState =
            new ReducingStateDescriptor<>("fire-interval", new DistinctCountAggregateFunction(), LongSerializer.INSTANCE);
    private long interval;


    public CustomCountDistinctTigger(long interval) 
        this.interval = interval;
    

    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception 
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        timestamp = ctx.getCurrentProcessingTime();

        if (fireTimestamp.get() == null) 
            long start = timestamp - (timestamp % interval);
            long nextFireTimestamp = start + interval;
            ctx.registerProcessingTimeTimer(nextFireTimestamp);
            fireTimestamp.add(nextFireTimestamp);
            return TriggerResult.CONTINUE;
        
        return TriggerResult.CONTINUE;
    

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception 
//        System.out.println("onProcessingTime called at "+System.currentTimeMillis() );
//        return TriggerResult.FIRE_AND_PURGE;
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println(df.format(new Date()));
        //interval
        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timeState);

        if(window.maxTimestamp() == time) 
            return TriggerResult.FIRE_AND_PURGE;
        
        else if (fireTimestamp.get().equals(time)) 
            fireTimestamp.clear();
            fireTimestamp.add(time + interval);
            ctx.registerProcessingTimeTimer(time + interval);
            return TriggerResult.FIRE;
        
        return TriggerResult.CONTINUE;
    

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception 
        return TriggerResult.CONTINUE;
    

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception 

    




distinct count:
DataStreamSink<Tuple2<String, Integer>> finalResultStream = keyedStream
                            .flatMap(new KPIDistinctDataFlatMapFunction(inputSchema))
                            .map(new SwapMap())
                            .keyBy(new WordKeySelector())
                            .window(TumblingProcessingTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                            .trigger(new CustomCountDistinctTigger(1 * 60 * 6000))
                            .aggregate(new DistinctCountAggregateFunction())
                            .print("final print");

【问题讨论】:

【参考方案1】:

您可以定义一个自定义触发器,每天返回一次 FIRE 以触发中间结果,然后在月底执行 FIRE_AND_PURGE 以关闭窗口。

每次触发器返回 FIRE 时,都会通过调用 ProcessWindowFunctionprocess() 方法评估您的窗口,此时它可以使用提供的 Collector 生成结果。 FIRE_AND_PURGE 最后一次评估窗口,然后销毁它。

另请参阅此问题的答案 -- How to display intermediate results in a windowed streaming-etl? -- 涵盖了相关主题。

【讨论】:

您的解决方案仍有问题。你能检查一下我上面的问题吗?我们正在使用表格窗口来处理不同的计数。您解决的触发器来自可以有触发器的 StreamExecutionEnvironment。但 StreamTableEnvironment 没有触发器。对吗? 我没有看到使用 Table API 或 SQL 的解决方案。 嗨,David,我在实现自定义触发器时遇到了问题。虽然没有来自卡夫卡的事件。它不会触发distinctcount?上面的代码示例适用于窗口 5 分钟和每分钟触发不同计数。 Flink 中没有空窗口。一个窗口在第一个事件被分配给它时被实例化。所以,没有事件,没有窗口。没有窗口,没有触发器。没有触发器,没有结果表明窗口为空。 好的。所以我可以为中间结果创建状态。如果没有触发触发,我如何获得计时器的中间结果(例如每天)?

以上是关于flink计数不同的问题的主要内容,如果未能解决你的问题,请参考以下文章

flink流计算随笔

从0到1Flink的成长之路(十九)-案例:计数窗口

Flink 滑动计数窗口行为

flink 并行计数器实现

Apache Flink 两种类型的窗口,时间和“计数”窗口

学习笔记Flink—— Flink开发环境配置及运行实例(单词计数)