Apache Flink:使用事件时间方式处理工业数据和延迟数据
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:使用事件时间方式处理工业数据和延迟数据相关的知识,希望对你有一定的参考价值。
1.声明
当前内容主要为测试和使用事件时间,使用自定义的时间作为水印,主要为模拟之用
- 工业的传感器中,默认获取的数据可能在多个工厂中的时间到来不一致,Flink处理的为工厂中的传递的时间
- 可能存在延迟的数据,延迟的数据需要处理(可能是网络原因或者其他原因)
主要内容为:
- 收集当前的延迟时间,显示延迟数据
- 使用事件窗口处理数据,处理当前数据
2.基本demo
pom依赖和ComputerTemperature实体类参考前面的博文
1.创建时间会变化的数据源Source(就是简单的变化)
import java.io.Serializable;
import java.util.Date;
import java.util.Iterator;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import com.hy.flink.test.pojo.AccountTransation;
import com.hy.flink.test.pojo.ComputerTemperature;
/**
*
* @author hy
* @createTime 2021-05-16 09:52:36
* @description 生成一个随机数的资源
*
*/
public class RandomComputerTemperatureLaterSource extends FromIteratorFunction<ComputerTemperature> {
/**
*
*/
private static final long serialVersionUID = 1L;
public RandomComputerTemperatureLaterSource(long sleepTime, Integer randomNumCount) {
super(new RandomComputerTemperatureIterator(sleepTime, randomNumCount));
// TODO Auto-generated constructor stub
}
private static class RandomComputerTemperatureIterator implements Iterator<ComputerTemperature>, Serializable {
/**
*
*/
private static final long serialVersionUID = 1L;
private final long sleepTime;
private String[] computerNames = { "电脑1", "电脑2", "电脑3" };
private Integer randomNumCount;
private RandomComputerTemperatureIterator(long sleepTime, Integer randomNumCount) {
this.sleepTime = sleepTime;
this.randomNumCount = randomNumCount;
}
@Override
public boolean hasNext() {
randomNumCount--;
return randomNumCount > 0;
}
@Override
public ComputerTemperature next() {
// 默认休眠时间为1秒钟
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Double temperature = Math.random() * 100 + 1;
int index = (int) (Math.random() * computerNames.length);
long timestamp = new Date().getTime();
// 控制时间的产生,让时间出现乱序的操作
if (timestamp % 2 == 0) {
timestamp -= temperature.intValue() * 3000;
}
ComputerTemperature computerTemperature = new ComputerTemperature(computerNames[index], temperature,
timestamp);
// StreamRecord<ComputerTemperature> streamRecord = new
// StreamRecord<ComputerTemperature>(computerTemperature,timestamp);
System.out.println("当前的数量为:" + randomNumCount);
System.out.println("产生数据为====》"+computerTemperature);
return computerTemperature;
}
}
}
为了对比数据,所以将所有的数据全部打印出来
2.创建ComputerTemperature的Sink
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author hy
* @createTime 2021-05-16 10:27:25
* @description 主要为打印当前的账户的信息
*
*/
public class ComputerTemperatureSink implements SinkFunction<ComputerTemperature> {
/**
*
*/
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override
public void invoke(ComputerTemperature value, Context context) {
LOG.info("LATE===>"+value.toString());
}
}
这里就是简单的log打印,这里可以采用其他的东西取代
3.编写具体操作类
package com.hy.flink.test.window;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import com.hy.flink.test.pojo.ComputerTemperature;
import com.hy.flink.test.pojo.ComputerTemperatureSink;
import com.hy.flink.test.source.RandomComputerTemperatureLaterSource;
import com.hy.flink.test.source.RandomComputerTemperatureSource;
import com.hy.flink.test.window.OfficeWindowsTest.MyMaxTemperatureHandler;
/**
*
* @author hy
* @createTime 2021-06-06 09:37:41
* @description 当前内容主要为测试和使用当前的迟到的事件 1. 产生迟到的事件(应该以随机的时间作为目标,而不是有序的) 2.
* 需要特定的水印才能处理,创建自己的时间水印
*/
public class LaterWindowEventTest {
static OutputTag<ComputerTemperature> lateTag = new OutputTag<ComputerTemperature>("late") {
};
public static void main(String[] args) {
// 设置存储延迟到的数据
// 设置当前的环境为本地环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 设置为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 设置数据来源为随机数方式的数据
DataStream<ComputerTemperature> dataStream = env.addSource(new RandomComputerTemperatureLaterSource(560, 30));
WatermarkStrategy<ComputerTemperature> strategy = WatermarkStrategy
.<ComputerTemperature>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
dataStream = dataStream.assignTimestampsAndWatermarks(strategy);
// 将数据按照当前的名称进行分组操作,然后每5秒统计一次,使用MyHandler进行统计操作
SingleOutputStreamOperator<Tuple3<String, Long, Double>> process = dataStream
/* .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator()) */
/*
* .assignTimestampsAndWatermarks( new
* BoundedOutOfOrdernessTimestampExtractor<ComputerTemperature>(Time.seconds(10)
* ) {
*
* @Override public long extractTimestamp(ComputerTemperature element) { return
* System.currentTimeMillis(); } })
*/
/*
* .assignTimestampsAndWatermarks(new
* AscendingTimestampExtractor<ComputerTemperature>() {
*
* @Override public long extractAscendingTimestamp(ComputerTemperature element)
* { return element.getTimestamp(); } })
*/
.keyBy(x -> x.getName())
// 使用timeWindow才是时间窗口,使用.window(TumblingProcessingTimeWindows.of(Time.seconds(5))的延迟数据无效,无法正常显示和处理延迟数据
.timeWindow(Time.milliseconds(5000))
// 将延迟到的数据放在延迟数据集合中
.sideOutputLateData(lateTag)
// 允许最晚到的时间为10秒的数据,也可以处理
.allowedLateness(Time.seconds(10))
.process(new MyMaxTemperatureHandler());
// print the results with a single thread, rather than in parallel
// 打印结果并使用单个线程的方式,采用并行计算,不管当前的是否有数据,都开始统计
process.addSink(new SinkFunction<Tuple3<String,Long,Double>>(){
@Override
public void invoke(Tuple3<String, Long, Double> value, Context context) throws Exception {
System.out.println("收集到的数据===>"+value);
}
}).setParallelism(1);
// 开始处理延迟的数据,问题1这里为什么没有数据显示????,是没有丢弃数据吗?通过查看发现有数据是被丢弃的
DataStream<ComputerTemperature> lateStream = process.getSideOutput(lateTag);
//SingleOutputStreamOperator<ComputerTemperature> process2 = lateStream.process(new MyLateTemperatureHandler());
//process2.print().setParallelism(1);// 延迟的数据的显示,发现丢弃的数据是无法打印的,是否能sink
lateStream.addSink(new ComputerTemperatureSink());
try {
env.execute("开始执行统计每个电脑的5次温度中的温度最大值");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<ComputerTemperature> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(ComputerTemperature element, long previousElementTimestamp) {
return element.getTimestamp();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
// 专门处理延期到达的数据
static class MyLateTemperatureHandler extends ProcessFunction<ComputerTemperature, ComputerTemperature> {
@Override
public void processElement(ComputerTemperature bean,
ProcessFunction<ComputerTemperature, ComputerTemperature>.Context context,
Collector<ComputerTemperature> out) throws Exception {
System.out.println("=======================>");
System.out.println(bean);
System.out.println("<========================");
// 不收集过时的bean
/* out.collect(bean); */
/* context.output(lateTag,bean); */
}
}
// 主要获取当前的temperature的最大温度
static class MyMaxTemperatureHandler
extends ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow> {
@Override
public void process(String key,
ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow>.Context context,
Iterable<ComputerTemperature> events, Collector<Tuple3<String, Long, Double>> out) throws Exception {
// TODO Auto-generated method stub
Double max = 0.0;
long time = 0L;
System.out.println("开始处理数据.........");
events.forEach(x -> System.out.println(x));
for (ComputerTemperature event : events) {
Double temperature = event.getTemperature();
if (temperature > max) {
max = temperature;
time = event.getTimestamp();
}
}
// 主要收集最大值的数据new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new
// Date(time))
out.collect(Tuple3.of(key, time, max));
}
}
}
这里使用OutputTag收集延迟的数据,自动收集,可以设置延迟时间
3.测试
由于数据量比较多乱,所以整理后:
测试成功!
4.其中的坑
1. 本人感觉有坑,在执行的过程中一直没有出现延迟数据的打印,但是实际处理的时候是少了数据,个人感觉必须使用:timeWindow(Time.milliseconds(5000))
而不是使用window
2.由于官方没有实际的例子,所以不知道是env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
还是其他的东西生效了,导致可以正常使用
以上是关于Apache Flink:使用事件时间方式处理工业数据和延迟数据的主要内容,如果未能解决你的问题,请参考以下文章
是否可以在 apache flink CEP 中处理多个流?