flink-demo-根据事件时间触发窗口计算
Posted 洽洽老大
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flink-demo-根据事件时间触发窗口计算相关的知识,希望对你有一定的参考价值。
1.数据:
事件的格式为 abc,3,20180503-110301
abc:统计的key值
3: key值的个数
20180503-110301: 事件时间戳
目标:
使用kafka存储产生的事件,然后用flink进行消费,采用flink的窗口机制,根据事件的时间戳,当两个事件的时间差到10s时,对这个窗口的事件进行统计。
输入:多行字符串
输出: 对输入的字符串进行wordcount,按事件时间的10s统计一次,输出3元组
(eee,7,20180504-113411|20180504-113415|20180504-113412|20180504-113419)
使用的api
AssignerWithPunctuatedWatermarks 用于发射水位线,默认当水位线大于窗口大小时触发窗口计算
FlatMapFunction 将数据进行切割,分成3元组
ReduceFunction 将多个3元组进行合并,输出为一个窗口的统计3元组
代码实现
1. 生产者
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.*;
import java.io.IOException;
import java.util.Properties;
import java.util.Scanner;
public class KafkaNumCountProducer
public static void main(String[] args) throws InterruptedException, IOException
Properties props = new Properties();
props.put("bootstrap.servers", "10.36.8.128:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
Scanner scanner = new Scanner(System.in);
String str;
while (scanner.hasNext())
str = scanner.nextLine();
if(!StringUtils.isBlank(str))
producer.send(new ProducerRecord<>("qiaqia_test_numcount", str), new Callback()
@Override
public void onCompletion(RecordMetadata metadata, Exception exception)
if (exception != null)
System.out.println("Failed to send message with exception " + exception);
);
producer.close();
读取控制台输入,然后输送到kafka中
2. 消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
@Slf4j
public class Kafka010NumCountConsumer
public static void main(String[] args) throws Exception
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(35000); // 非常关键,一定要设置启动检查点!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
log.info("\\n\\n启动\\n\\n");
Properties props = new Properties();
props.setProperty("bootstrap.servers", "10.36.8.128:9092");
props.setProperty("group.id", "flink-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer010<String> consumer =
new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props);
// consumer.setStartFromEarliest();
consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());
DataStream<Tuple3<String, Integer, String>> keyedStream = env
.addSource(consumer)
.flatMap(new MessageSplitter())
.keyBy(0)
.timeWindow(Time.seconds(10))
.reduce(new ReduceFunction<Tuple3<String, Integer, String>>()
@Override
public Tuple3<String, Integer, String> reduce(Tuple3<String, Integer, String> t0, Tuple3<String, Integer, String> t1) throws Exception
String time0 = t0.getField(2);
String time1 = t1.getField(2);
Integer count0 = t0.getField(1);
Integer count1 = t1.getField(1);
return new Tuple3<>(t0.getField(0), count0 + count1, time0 +"|"+ time1);
);
keyedStream.writeAsText(args[1], FileSystem.WriteMode.OVERWRITE);
keyedStream.print();
env.execute("Flink-Kafka num count");
private static class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String>
private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd-hhmmss");
/*
* 再执行该函数,extractedTimestamp的值是extractTimestamp的返回值
*/
@Nullable
@Override
public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp)
if (lastElement != null && lastElement.contains(","))
String[] parts = lastElement.split(",");
if(parts.length==3)
try
log.info("lastElement=,and time=", lastElement, sdf.parse(parts[2]));
return new Watermark(sdf.parse(parts[2]).getTime());
catch (ParseException e)
e.printStackTrace();
return null;
/*
* 先执行该函数,从element中提取时间戳
* previousElementTimestamp 是当前的时间
*/
@Override
public long extractTimestamp(String element, long previousElementTimestamp)
if (element != null && element.contains(","))
String[] parts = element.split(",");
if (parts.length == 3)
try
log.info("extract=,privious=,curr=", element, sdf.format(new Date(previousElementTimestamp)), sdf.parse(parts[2]));
return sdf.parse(parts[2]).getTime();
catch (ParseException e)
e.printStackTrace();
return 0L;
private static class MessageSplitter implements FlatMapFunction<String, Tuple3<String, Integer, String>>
@Override
public void flatMap(String s, Collector<Tuple3<String, Integer, String>> collector) throws Exception
if (s != null && s.contains(","))
String[] strs = s.split(",");
if(strs.length==3)
collector.collect(new Tuple3<>(strs[0], Integer.parseInt(strs[1]), strs[2]));
测试
1.启动
将消费者打包成jar包后,上传到flink所在服务器,在控制台输入
flink run -c com.b3434.examples.Kafka010NumCountConsumer ./myjobs/flink-kafka-avro.jar qiaqia_test_numcount /home/dengyiping/myjob/numcount.txt
qiaqia_test_numcount是kafka的topic
numcount.txt为输出结果
2.输入
控制台依次输入
eee,1,20180504-113411
eee,2,20180504-113415
eee,2,20180504-113412
eee,2,20180504-113419
eee,1,20180504-113421
3.输出
- 每次输入一条数据,日志输出为
2018-05-04 17:07:47,838 INFO com.b3434.examples.Kafka010NumCountConsumer - extract=eee,2,20180504-113412,privious=20180504-050903,curr=Fri May 04 11:34:12 CST 2018
2018-05-04 17:07:47,838 INFO com.b3434.examples.Kafka010NumCountConsumer- lastElement=eee,2,20180504-113412,and time=Fri May 04 11:34:12 CST 2018
上面为输入eee,2,20180504-113412 时的日志,可以看出,程序先执行extractTimestamp方法,然后再执行checkAndGetNextWatermark,
- tail -f numcount.txt 监控numcount.txt输出
当最后一条输入时,可以看到程序输出了前4条的计算结果
(eee,7,20180504-113411|20180504-113415|20180504-113412|20180504-113419)
以上是关于flink-demo-根据事件时间触发窗口计算的主要内容,如果未能解决你的问题,请参考以下文章