flink-demo-根据事件时间触发窗口计算

Posted 洽洽老大

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink-demo-根据事件时间触发窗口计算相关的知识,希望对你有一定的参考价值。

1.数据:

事件的格式为 abc,3,20180503-110301

abc:统计的key3: key值的个数
20180503-110301: 事件时间戳

目标:

使用kafka存储产生的事件,然后用flink进行消费,采用flink的窗口机制,根据事件的时间戳,当两个事件的时间差到10s时,对这个窗口的事件进行统计。

输入:多行字符串

输出: 对输入的字符串进行wordcount,按事件时间的10s统计一次,输出3元组
(eee,7,20180504-113411|20180504-113415|20180504-113412|20180504-113419)

使用的api

AssignerWithPunctuatedWatermarks 用于发射水位线,默认当水位线大于窗口大小时触发窗口计算
FlatMapFunction 将数据进行切割,分成3元组
ReduceFunction 将多个3元组进行合并,输出为一个窗口的统计3元组

代码实现

1. 生产者

import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.*;

import java.io.IOException;
import java.util.Properties;
import java.util.Scanner;

public class KafkaNumCountProducer 

    public static void main(String[] args) throws InterruptedException, IOException 

        Properties props = new Properties();
        props.put("bootstrap.servers", "10.36.8.128:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        Scanner scanner = new Scanner(System.in);
        String str;
        while (scanner.hasNext()) 
            str = scanner.nextLine();
            if(!StringUtils.isBlank(str)) 
                producer.send(new ProducerRecord<>("qiaqia_test_numcount", str), new Callback() 
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) 
                        if (exception != null) 
                            System.out.println("Failed to send message with exception " + exception);
                        
                    
                );
            
        
        producer.close();
    

读取控制台输入,然后输送到kafka中

2. 消费者

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

@Slf4j
public class Kafka010NumCountConsumer 
    public static void main(String[] args) throws Exception 
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(35000); // 非常关键,一定要设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        log.info("\\n\\n启动\\n\\n");
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "10.36.8.128:9092");
        props.setProperty("group.id", "flink-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props);
//        consumer.setStartFromEarliest();
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());

        DataStream<Tuple3<String, Integer, String>> keyedStream = env
                .addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .reduce(new ReduceFunction<Tuple3<String, Integer, String>>() 
                    @Override
                    public Tuple3<String, Integer, String> reduce(Tuple3<String, Integer, String> t0, Tuple3<String, Integer, String> t1) throws Exception 
                        String time0 = t0.getField(2);
                        String time1 = t1.getField(2);
                        Integer count0 = t0.getField(1);
                        Integer count1 = t1.getField(1);
                        return new Tuple3<>(t0.getField(0), count0 + count1, time0 +"|"+ time1);
                    
                );

        keyedStream.writeAsText(args[1], FileSystem.WriteMode.OVERWRITE);
        keyedStream.print();
        env.execute("Flink-Kafka num count");
    

    private static class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> 

        private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd-hhmmss");

        /*
         * 再执行该函数,extractedTimestamp的值是extractTimestamp的返回值
         */
        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) 
            if (lastElement != null && lastElement.contains(",")) 
                String[] parts = lastElement.split(",");
                if(parts.length==3) 
                    try 
                        log.info("lastElement=,and time=", lastElement, sdf.parse(parts[2]));
                        return new Watermark(sdf.parse(parts[2]).getTime());
                     catch (ParseException e) 
                        e.printStackTrace();
                    
                

            
            return null;
        

        /*
         * 先执行该函数,从element中提取时间戳
         * previousElementTimestamp 是当前的时间
         */
        @Override
        public long extractTimestamp(String element, long previousElementTimestamp) 
            if (element != null && element.contains(",")) 
                String[] parts = element.split(",");
                if (parts.length == 3) 
                    try 
                        log.info("extract=,privious=,curr=", element, sdf.format(new Date(previousElementTimestamp)), sdf.parse(parts[2]));
                        return sdf.parse(parts[2]).getTime();
                     catch (ParseException e) 
                        e.printStackTrace();
                    
                
            
            return 0L;
        
    
    private static class MessageSplitter implements FlatMapFunction<String, Tuple3<String, Integer, String>> 


        @Override
        public void flatMap(String s, Collector<Tuple3<String, Integer, String>> collector) throws Exception 
            if (s != null && s.contains(",")) 
                String[] strs = s.split(",");
                if(strs.length==3) 
                    collector.collect(new Tuple3<>(strs[0], Integer.parseInt(strs[1]), strs[2]));
                
            
        
    


测试

1.启动

将消费者打包成jar包后,上传到flink所在服务器,在控制台输入

flink run -c com.b3434.examples.Kafka010NumCountConsumer ./myjobs/flink-kafka-avro.jar qiaqia_test_numcount /home/dengyiping/myjob/numcount.txt

qiaqia_test_numcount是kafka的topic

numcount.txt为输出结果

2.输入

控制台依次输入

eee,1,20180504-113411
eee,2,20180504-113415
eee,2,20180504-113412
eee,2,20180504-113419
eee,1,20180504-113421

3.输出

  1. 每次输入一条数据,日志输出为
2018-05-04 17:07:47,838 INFO  com.b3434.examples.Kafka010NumCountConsumer - extract=eee,2,20180504-113412,privious=20180504-050903,curr=Fri May 04 11:34:12 CST 2018
2018-05-04 17:07:47,838 INFO  com.b3434.examples.Kafka010NumCountConsumer- lastElement=eee,2,20180504-113412,and time=Fri May 04 11:34:12 CST 2018

上面为输入eee,2,20180504-113412 时的日志,可以看出,程序先执行extractTimestamp方法,然后再执行checkAndGetNextWatermark,

  1. tail -f numcount.txt 监控numcount.txt输出
    当最后一条输入时,可以看到程序输出了前4条的计算结果
    (eee,7,20180504-113411|20180504-113415|20180504-113412|20180504-113419)

以上是关于flink-demo-根据事件时间触发窗口计算的主要内容,如果未能解决你的问题,请参考以下文章

Flink学习(十四) Flink 窗口时间和水位线

Flink WaterMaker详解

Flink 状态编程

Flink详解系列之五--水位线(watermark)

flink窗口与水位线watermark例子

Flink 中的时间和窗口