Flink---窗口函数
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---窗口函数相关的知识,希望对你有一定的参考价值。
各种窗口
import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class Win_Offset {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
String[] split = line.split(",");
SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sr;
});
// WindowedStream<SensorReading, Tuple, TimeWindow> windowedStream = mapStream.keyBy("id")
// .timeWindow(Time.seconds(10)); //滚动窗口
// .timeWindow(Time.seconds(10),Time.seconds(5)); //滑动窗口
// .countWindow(12); //固定大小窗口
// .countWindow(12,4);
// .window(EventTimeSessionWindows.withGap(Time.milliseconds(10000))); //静态会话窗口
// .window(EventTimeSessionWindows.withDynamicGap());
// .window(TumblingEventTimeWindows.of(Time.seconds(15))); //滚动窗口和上面等价,但是这个可以添加时区
// .window(TumblingProcessingTimeWindows.of(Time.seconds(10))); //滑动窗口
/*
SingleOutputStreamOperator<SensorReading> resultMaxStream = windowedStream.max("temperature");
resultMaxStream.print("max");*/
KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
SingleOutputStreamOperator<SensorReading> reduceStream = keyedStream.countWindow(6, 2)
.reduce(new ReduceFunction<SensorReading>() {
@Override
public SensorReading reduce(SensorReading sensorReading, SensorReading t1) throws Exception {
return new SensorReading(
sensorReading.getId(),
sensorReading.getTimestamp(),
sensorReading.getTemperature() + t1.getTemperature()
);
}
});
reduceStream.print("countWindow");
try {
env.execute("滑动窗口,滚动窗口");
} catch (Exception e) {
e.printStackTrace();
}
}
}
aggregate
import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
/**
* @Author shall潇
* @Date 2021/6/30
* @Description 求一段时间内的温度的平均值:aggregate
*/
public class Win_Avg {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
String[] split = line.split(",");
SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sr;
});
KeyedStream<SensorReading, Tuple> keyedStream = mapStream.keyBy("id");
keyedStream.countWindow(6,2)
.aggregate(new AggregateFunction<SensorReading, Tuple2<Double,Integer>, Double>() {
@Override
// 初始化
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0); // 温度,个数
}
@Override
// 每进来一个数据,进行温度累加,个数累加
public Tuple2<Double, Integer> add(SensorReading sensorReading, Tuple2<Double, Integer> doubleIntegerTuple2) {
Double temp = sensorReading.getTemperature()+doubleIntegerTuple2.f0;
Integer count = doubleIntegerTuple2.f1+1;
return new Tuple2<>(temp,count);
}
@Override
// 统计平均值
public Double getResult(Tuple2<Double, Integer> doubleIntegerTuple2) {
Double avgValue = doubleIntegerTuple2.f0/doubleIntegerTuple2.f1;
return avgValue;
}
@Override
// 组间合并
public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> doubleIntegerTuple2, Tuple2<Double, Integer> acc1) {
Double sumTemp = doubleIntegerTuple2.f0+acc1.f0;
Integer cnt = doubleIntegerTuple2.f1+acc1.f1;
return new Tuple2<>(sumTemp,cnt);
}
});
try {
env.execute("avg-window");
} catch (Exception e) {
e.printStackTrace();
}
}
}
apply
import Flink.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
public class Win_4 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 默认为事件事件
// 配置kafka
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro5");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
String[] split = line.split(",");
SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sr;
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) { //设置延迟时间
@Override
public long extractTimestamp(SensorReading sensorReading) {
return sensorReading.getTimestamp() * 1000L;
}
}).setParallelism(3);
SingleOutputStreamOperator<Tuple4<String, Long, Long, Integer>> appStream = mapStream.keyBy("id")
.timeWindow(Time.seconds(15))
// 当窗口关闭的时候执行计算,和 reduce,agg不一样,它们是来一条执行一条
.apply(new WindowFunction<SensorReading, Tuple4<String, Long, Long, Integer>, Tuple, TimeWindow>() {
@Override
// tuple : 存放的是 key,timeWindow:窗口信息,iterable:当前窗口内所有数据,
public void apply(Tuple tuple,
TimeWindow timeWindow,
Iterable<SensorReading> iterable,
Collector<Tuple4<String, Long, Long, Integer>> collector) throws Exception {
String key = tuple.getField(0); // key sensor_1
long start = timeWindow.getStart();
long end = timeWindow.getEnd();
Iterator<SensorReading> iterator = iterable.iterator();
List list = IteratorUtils.toList(iterator);
int size = list.size();
Tuple4<String, Long, Long, Integer> returnValue = new Tuple4<>(key, start, end, size);
collector.collect(returnValue); //返回
}
});
dss.print("input");
appStream.print("apply");
try {
env.execute("flink-windows");
} catch (Exception e) {
e.printStackTrace();
}
}
}
以上是关于Flink---窗口函数的主要内容,如果未能解决你的问题,请参考以下文章
Flink AggregateFunction窗口函数,merge何时执行