Apache Flink:使用事件时间方式处理工业数据和延迟数据

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink:使用事件时间方式处理工业数据和延迟数据相关的知识,希望对你有一定的参考价值。

1.声明

当前内容主要为测试和使用事件时间,使用自定义的时间作为水印,主要为模拟之用

  1. 工业的传感器中,默认获取的数据可能在多个工厂中的时间到来不一致,Flink处理的为工厂中的传递的时间
  2. 可能存在延迟的数据,延迟的数据需要处理(可能是网络原因或者其他原因)

主要内容为:

  1. 收集当前的延迟时间,显示延迟数据
  2. 使用事件窗口处理数据,处理当前数据

2.基本demo

pom依赖和ComputerTemperature实体类参考前面的博文

1.创建时间会变化的数据源Source(就是简单的变化)


import java.io.Serializable;
import java.util.Date;
import java.util.Iterator;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import com.hy.flink.test.pojo.AccountTransation;
import com.hy.flink.test.pojo.ComputerTemperature;

/**
 * 
 * @author hy
 * @createTime 2021-05-16 09:52:36
 * @description 生成一个随机数的资源
 *
 */
public class RandomComputerTemperatureLaterSource extends FromIteratorFunction<ComputerTemperature> {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public RandomComputerTemperatureLaterSource(long sleepTime, Integer randomNumCount) {
		super(new RandomComputerTemperatureIterator(sleepTime, randomNumCount));
		// TODO Auto-generated constructor stub
	}

	private static class RandomComputerTemperatureIterator implements Iterator<ComputerTemperature>, Serializable {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;
		private final long sleepTime;
		private String[] computerNames = { "电脑1", "电脑2", "电脑3" };
		private Integer randomNumCount;

		private RandomComputerTemperatureIterator(long sleepTime, Integer randomNumCount) {
			this.sleepTime = sleepTime;
			this.randomNumCount = randomNumCount;
		}

		@Override
		public boolean hasNext() {
			randomNumCount--;
			return randomNumCount > 0;
		}

		@Override
		public ComputerTemperature next() {
			// 默认休眠时间为1秒钟
			try {
				Thread.sleep(sleepTime);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			Double temperature = Math.random() * 100 + 1;
			int index = (int) (Math.random() * computerNames.length);
			long timestamp = new Date().getTime();
			// 控制时间的产生,让时间出现乱序的操作
			if (timestamp % 2 == 0) {
				timestamp -= temperature.intValue() * 3000;
			}

			ComputerTemperature computerTemperature = new ComputerTemperature(computerNames[index], temperature,
					timestamp);
			// StreamRecord<ComputerTemperature> streamRecord = new
			// StreamRecord<ComputerTemperature>(computerTemperature,timestamp);
			System.out.println("当前的数量为:" + randomNumCount);
			System.out.println("产生数据为====》"+computerTemperature);
			return computerTemperature;
		}
	}

}

为了对比数据,所以将所有的数据全部打印出来

2.创建ComputerTemperature的Sink

import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author hy
 * @createTime 2021-05-16 10:27:25
 * @description 主要为打印当前的账户的信息
 *
 */
public class ComputerTemperatureSink implements SinkFunction<ComputerTemperature> {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);

	@Override
	public void invoke(ComputerTemperature value, Context context) {
		LOG.info("LATE===>"+value.toString());
	}
}

这里就是简单的log打印,这里可以采用其他的东西取代

3.编写具体操作类

package com.hy.flink.test.window;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Date;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.walkthrough.common.sink.AlertSink;

import com.hy.flink.test.pojo.ComputerTemperature;
import com.hy.flink.test.pojo.ComputerTemperatureSink;
import com.hy.flink.test.source.RandomComputerTemperatureLaterSource;
import com.hy.flink.test.source.RandomComputerTemperatureSource;
import com.hy.flink.test.window.OfficeWindowsTest.MyMaxTemperatureHandler;

/**
 * 
 * @author hy
 * @createTime 2021-06-06 09:37:41
 * @description 当前内容主要为测试和使用当前的迟到的事件 1. 产生迟到的事件(应该以随机的时间作为目标,而不是有序的) 2.
 *              需要特定的水印才能处理,创建自己的时间水印
 */
public class LaterWindowEventTest {
	static OutputTag<ComputerTemperature> lateTag = new OutputTag<ComputerTemperature>("late") {
	};
	public static void main(String[] args) {
		// 设置存储延迟到的数据
		
		// 设置当前的环境为本地环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

		// 设置为事件时间
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
		// 设置数据来源为随机数方式的数据
		DataStream<ComputerTemperature> dataStream = env.addSource(new RandomComputerTemperatureLaterSource(560, 30));
		
		  WatermarkStrategy<ComputerTemperature> strategy = WatermarkStrategy
		  .<ComputerTemperature>forBoundedOutOfOrderness(Duration.ofSeconds(5))
		  .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
		  
		  dataStream = dataStream.assignTimestampsAndWatermarks(strategy);
		 
		// 将数据按照当前的名称进行分组操作,然后每5秒统计一次,使用MyHandler进行统计操作
		SingleOutputStreamOperator<Tuple3<String, Long, Double>> process = dataStream
				/* .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator()) */

				/*
				 * .assignTimestampsAndWatermarks( new
				 * BoundedOutOfOrdernessTimestampExtractor<ComputerTemperature>(Time.seconds(10)
				 * ) {
				 * 
				 * @Override public long extractTimestamp(ComputerTemperature element) { return
				 * System.currentTimeMillis(); } })
				 */

				/*
				 * .assignTimestampsAndWatermarks(new
				 * AscendingTimestampExtractor<ComputerTemperature>() {
				 * 
				 * @Override public long extractAscendingTimestamp(ComputerTemperature element)
				 * { return element.getTimestamp(); } })
				 */
				.keyBy(x -> x.getName())
				// 使用timeWindow才是时间窗口,使用.window(TumblingProcessingTimeWindows.of(Time.seconds(5))的延迟数据无效,无法正常显示和处理延迟数据
				.timeWindow(Time.milliseconds(5000))
				
				// 将延迟到的数据放在延迟数据集合中
				.sideOutputLateData(lateTag)
				// 允许最晚到的时间为10秒的数据,也可以处理
				.allowedLateness(Time.seconds(10))
				.process(new MyMaxTemperatureHandler());
		// print the results with a single thread, rather than in parallel
		// 打印结果并使用单个线程的方式,采用并行计算,不管当前的是否有数据,都开始统计
		process.addSink(new SinkFunction<Tuple3<String,Long,Double>>(){

			@Override
			public void invoke(Tuple3<String, Long, Double> value, Context context) throws Exception {
				System.out.println("收集到的数据===>"+value);
			}
			
		}).setParallelism(1);

		// 开始处理延迟的数据,问题1这里为什么没有数据显示????,是没有丢弃数据吗?通过查看发现有数据是被丢弃的
		DataStream<ComputerTemperature> lateStream = process.getSideOutput(lateTag);
		//SingleOutputStreamOperator<ComputerTemperature> process2 = lateStream.process(new MyLateTemperatureHandler());
		//process2.print().setParallelism(1);// 延迟的数据的显示,发现丢弃的数据是无法打印的,是否能sink
		lateStream.addSink(new ComputerTemperatureSink());

		try {
			env.execute("开始执行统计每个电脑的5次温度中的温度最大值");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	private static class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<ComputerTemperature> {

		private final long maxTimeLag = 5000; // 5 seconds

		@Override
		public long extractTimestamp(ComputerTemperature element, long previousElementTimestamp) {
			return element.getTimestamp();
		}

		@Override
		public Watermark getCurrentWatermark() {
			// return the watermark as current time minus the maximum time lag
			return new Watermark(System.currentTimeMillis() - maxTimeLag);
		}
	}

	// 专门处理延期到达的数据
	static class MyLateTemperatureHandler extends ProcessFunction<ComputerTemperature, ComputerTemperature> {

		@Override
		public void processElement(ComputerTemperature bean,
				ProcessFunction<ComputerTemperature, ComputerTemperature>.Context context,
				Collector<ComputerTemperature> out) throws Exception {
			System.out.println("=======================>");
			System.out.println(bean);
			System.out.println("<========================");
			// 不收集过时的bean
			/* out.collect(bean); */
			/* context.output(lateTag,bean); */
		}

	}

	// 主要获取当前的temperature的最大温度
	static class MyMaxTemperatureHandler
			extends ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow> {

		@Override
		public void process(String key,
				ProcessWindowFunction<ComputerTemperature, Tuple3<String, Long, Double>, String, TimeWindow>.Context context,
				Iterable<ComputerTemperature> events, Collector<Tuple3<String, Long, Double>> out) throws Exception {
			// TODO Auto-generated method stub
			Double max = 0.0;
			long time = 0L;

			System.out.println("开始处理数据.........");
			events.forEach(x -> System.out.println(x));

			for (ComputerTemperature event : events) {
				Double temperature = event.getTemperature();
				if (temperature > max) {
					max = temperature;
					time = event.getTimestamp();
				}
			}
			// 主要收集最大值的数据new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new
			// Date(time))
			out.collect(Tuple3.of(key, time, max));

		}

	}

}

这里使用OutputTag收集延迟的数据,自动收集,可以设置延迟时间

3.测试

在这里插入图片描述
由于数据量比较多乱,所以整理后:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
测试成功!

4.其中的坑

1. 本人感觉有坑,在执行的过程中一直没有出现延迟数据的打印,但是实际处理的时候是少了数据,个人感觉必须使用:timeWindow(Time.milliseconds(5000))而不是使用window

2.由于官方没有实际的例子,所以不知道是env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);还是其他的东西生效了,导致可以正常使用

以上是关于Apache Flink:使用事件时间方式处理工业数据和延迟数据的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink - 事件时间

是否可以在 apache flink CEP 中处理多个流?

apache Flink初探

「Flink」事件时间与水印

Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计

Flink第一课!使用批处理,流处理,Socket的方式实现经典词频统计