Apache flink 加入

Posted

技术标签:

【中文标题】Apache flink 加入【英文标题】:Apache flink Join 【发布时间】:2018-02-22 00:36:40 【问题描述】:

在 Apache flink 中,我有 2 个 Tuple8 流进出。 事件元组的 8 个字段中的 4 个(Tuple4)作为键。我想执行两个流之间存在的记录的关联,作为其中的一个步骤,我使用连接运算符加入 2 个流。根据语义,我应该得到包含内部连接记录的输出流。但是,我没有得到任何输出或匹配。 env 的时序特征设置为事件时间戳,元组的第一个元素是时间戳,我将其提取并使用 assign 将其标记为时间戳

DataStream<String> input = env.readTextFile("/tmp/logScrape/out/raw-input.out");
DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> inFiltered =
                input.flatMap(new Splitter())
                        .filter(new InFilter())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) 
                            @Override
                            public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) 
                                return record.f0;
                            
                        );
        DataStream<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>> exitFiltered =
                input.flatMap(new Splitter())
                        .filter(new ExitFilter())
                        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>>(Time.seconds(10)) 
                            @Override
                            public long extractTimestamp(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> record) 
                                return record.f0;
                            
                        );

inFiltered.join(exitFiltered)
                .where(new TupleKeySelector())
                .equalTo(new TupleKeySelector())
                .window(TumblingEventTimeWindows.of(Time.milliseconds(1000000)))
                .apply(new StreamJoinner())
                .writeAsText("/tmp/logScrape/out/output");

 public static class TupleKeySelector implements KeySelector<
          Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String>, 
          Tuple4<String, Integer, String, Integer>> 
        @Override
        public Tuple4<String, Integer, String, Integer> getKey(Tuple8<Long, Integer, String, Integer, String, Integer, Integer, String> value) 
            return new Tuple4<>(value.f2, value.f3, value.f4, value.f5);
        
    

这是我为inFiltered 流得到的输出记录

(1519254461076381189,1234,program1,11,program2,20,27,in)
(1519254462071697685,1234,program1,11,program2,20,27,in)
(1519254463067014246,1234,program1,11,program2,20,27,in)

这是我为exitFiltered 流得到的输出记录

(1519254458167640292,6789,program1,11,program2,20,27,out)
(1519254460158076301,6789,program1,11,program2,20,27,out)
(1519254461153294238,6789,program1,11,program2,20,27,out)
(1519254462148512207,6789,program1,11,program2,20,27,out)
(1519254463143730191,6789,program1,11,program2,20,27,out)

问题:

这里有什么我遗漏的东西,所以我应该开始看到合并的结果吗? 有什么方法可以在处理过程中调试代码?我不确定我的情况是它的键选择器有问题还是窗口没有正确发生。

【问题讨论】:

对于多字段的数据类型,建议使用 POJO(Plain old Java objects)而不是 TupleX。此外,POJO 可用于为大型元组类型命名。 ci.apache.org/projects/flink/flink-docs-release-1.4/dev/… 【参考方案1】:

你有一个 1,000,000 毫秒的翻转窗口,对吧?通过查看您的两个过滤流的时间戳(第一个字段,对吗?),我看不到在相同的 1M 毫秒内发生任何事情。

【讨论】:

这很好,我一直在尝试不同的窗口大小 - 我的意思是非常宽的窗口 - 即 1000000000 毫秒,它在那里工作。碰巧我在上述记录中的时间戳是 unix 纪元格式的时间戳,即具有纳秒粒度。在使用时间戳提取器分配时间戳时,我找不到 Flink 期望的粒度。 你是对的。我没有把时间戳带到毫秒粒度是我的错。而且 Flink 依赖于 JVM 纪元粒度,所以它只有毫秒(它写在文档中,我不知何故错过了) 现在可以用了吗?如果是,请将其标记为已回答,否则请提供更多详细信息,谢谢。

以上是关于Apache flink 加入的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink:如何计算 DataStream 中的事件总数

Apache Flink 在翼支付的实践应用

行业实践-Apache Flink 在移动云实时计算的实践

Apache Flink 管理大型状态之增量 Checkpoint 详解

阿里Uber谷歌苹果的大牛都来了,Apache Flink技术盛宴有何魅力?

袋鼠云 x Apache Flink 交流专场开放报名啦!