Flink 复杂事件处理

Posted

技术标签:

【中文标题】Flink 复杂事件处理【英文标题】:Flink Complex Event Processing 【发布时间】:2017-07-11 11:19:55 【问题描述】:

我有一个从套接字读取并检测模式的 flink cep 代码。可以说模式(单词)是“警报”。如果警报一词出现五次或更多次,则应创建警报。但我收到输入不匹配错误。 Flink 版本是 1.3.0。提前致谢!!

package pattern;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

    public class cep 

         public static void main(String[] args) throws Exception 


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() 
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception 
                                return word.equals("alert");
                            
                        )
                        .times(5);


                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .flatSelect((Map<String,List<String>> in, Collector<String> out) -> 

                            String first = in.get("first").get(0);

                            for (int i = 0; i < 6; i++ ) 

                                out.collect(first);

                            


                        );

                alerts.print();

                env.execute();

            

    

【问题讨论】:

我已经让代码工作了。做了一些修改。 【参考方案1】:

只是对原始问题的一些澄清。在 1.3.0 中,有一个错误导致无法使用 lambdas 作为 select/flatSelect 的参数。

它已在 1.3.1 中修复,因此您的第一个代码版本将适用于 1.3.1。


此外,我认为您误解了 times 量词。它匹配确切的次数。因此,在您的情况下,它只会在事件完全匹配 3 次而不是 3 次或更多时返回。

【讨论】:

嘿,我的代码可以在 1.3.0 中使用。删除了 lambda 表达式。检查以下答案。不管怎么说,还是要谢谢你。欣赏它@Dawid Wysakowicz 是的,我知道,我只是想澄清一下为什么您的代码有效。 酷@Dawid。感谢您的澄清!当我发布此代码时,我忘记将时间 (3) 更改为时间 (5)。哎呀,我的错。现在我已经编辑了它。【参考方案2】:

所以我已经让代码工作了。这是可行的解决方案,

    package pattern;

    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternSelectFunction;
    import org.apache.flink.cep.PatternStream;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.IterativeCondition;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;

    import java.util.List;
    import java.util.Map;

    public class cep 

         public static void main(String[] args) throws Exception 


             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

                DataStreamSource<String> dss = env.socketTextStream("localhost", 3005);

                dss.print();

                Pattern<String,String> pattern = Pattern.<String> begin("first")
                        .where(new IterativeCondition<String>() 
                            @Override
                            public boolean filter(String word, Context<String> context) throws Exception 
                                return word.equals("alert");
                            
                        )
                        .times(5);

                PatternStream<String> patternstream = CEP.pattern(dss, pattern);

                DataStream<String> alerts = patternstream
                        .select(new PatternSelectFunction<String, String>() 
                            @Override
                            public String select(Map<String, List<String>> in) throws Exception 

                                String first = in.get("first").get(0);

                                if(first.equals("alert"))

                                    return ("5 or more alerts");
                                
                                else

                                    return (" ");
                                
                            
                        );

                alerts.print();

                env.execute();

            

    

【讨论】:

以上是关于Flink 复杂事件处理的主要内容,如果未能解决你的问题,请参考以下文章

FlinkCEP - Flink的复杂事件处理

Flink 复杂事物处理

大数据计算引擎之Flink Flink CEP复杂事件编程

Flink流处理之窗口算子分析

案例简介flink CEP

Flink流式处理概念简介