Flink---分流
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink---分流相关的知识,希望对你有一定的参考价值。
Flink中将一个流拆分成多个流的方法有两个:split(已过时),process(推荐)
split
package Flink.transform;
import Flink.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Collections;
import java.util.Properties;
/**
* @Author shall潇
* @Date 2021/6/29
* @Description 分流-合流
* 从kafka读取进行分流,按照高、低、正常进行分流
*/
public class Transform3 {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.159.100:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"gro1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
DataStreamSource<String> dss = env.addSource(new FlinkKafkaConsumer011<String>("flinkKafka", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<SensorReading> mapStream = dss.map(line -> {
String[] split = line.split(",");
SensorReading sr = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sr;
});
SplitStream<SensorReading> splitStream = mapStream.split(new OutputSelector<SensorReading>() {
@Override
public Iterable<String> select(SensorReading sensorReading) {
if (sensorReading.getTemperature() > 38.0) {
return Collections.singletonList("high");
} else if(sensorReading.getTemperature() > 36.0){
return Collections.singletonList("normal");
}else {
return Collections.singletonList("lower");
}
}
});
// 通过 split 方法进行分流
DataStream<SensorReading> high = splitStream.select("high");
DataStream<SensorReading> normal = splitStream.select("normal");
DataStream<SensorReading> lower = splitStream.select("lower");
high.print("high");
normal.print("normal");
lower.print("lower");
// union : 合流 要求所有流的数据格式完全相同
DataStream<SensorReading> unionStream = high.union(normal, lower);
unionStream.print("union");
// connect : 合流 不要求...,只能两个合
SingleOutputStreamOperator<Tuple2<String, Double>> warning = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading sensorReading) throws Exception {
return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature());
}
});
ConnectedStreams<SensorReading, Tuple2<String, Double>> connectStream = normal.connect(warning);
SingleOutputStreamOperator<Object> map = connectStream.map(new CoMapFunction<SensorReading, Tuple2<String, Double>, Object>() {
@Override
public Object map1(SensorReading sensorReading) throws Exception {
return new Tuple2<>(sensorReading.getId(), "健康");
}
@Override
public Object map2(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
return new Tuple3<>(stringDoubleTuple2.f0, stringDoubleTuple2.f1, "发烧了");
}
});
try {
env.execute("split-demo");
} catch (Exception e) {
e.printStackTrace();
}
}
private static class MyRichMapFunction extends RichMapFunction<SensorReading, Tuple2<String, Double>>{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
@Override
public Tuple2<String, Double> map(SensorReading sensorReading) throws Exception {
return null;
}
}
}
process
package Flink.process;
import Flink.beans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @Author shall潇
* @Date 2021/7/6
* @Description 将温度按照高温、正常、低温分流
*/
public class Process4_Function {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//使用侧输出流获得
OutputTag<SensorReading> highTemp = new OutputTag<SensorReading>("highTemp"){};
OutputTag<SensorReading> lowTemp = new OutputTag<SensorReading>("lowTemp"){};
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.159.100", 7777);
SingleOutputStreamOperator<SensorReading> mapStream = dataStreamSource.map(line -> {
String[] split = line.split(",");
SensorReading sensorReading = new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));
return sensorReading;
});
SingleOutputStreamOperator<SensorReading> process = mapStream.process(new MyProcessFunction2(highTemp,lowTemp));
process.print("normal");
// 通过getSideOutPut获取指定测输出流,参数:OutputTag
DataStream<SensorReading> sideOutput1 = process.getSideOutput(highTemp);
sideOutput1.print("high-temp");
DataStream<SensorReading> sideOutput2 = process.getSideOutput(lowTemp);
sideOutput2.print("low-temp");
try {
env.execute("process-2");
} catch (Exception e) {
e.printStackTrace();
}
}
private static class MyProcessFunction2 extends ProcessFunction<SensorReading, SensorReading> {
private OutputTag<SensorReading> highTemp;
private OutputTag<SensorReading> lowTemp;
public MyProcessFunction2(OutputTag<SensorReading> highTemp, OutputTag<SensorReading> lowTemp) {
this.highTemp = highTemp;
this.lowTemp = lowTemp;
}
@Override
public void processElement(SensorReading sensorReading, Context context, Collector<SensorReading> collector) throws Exception {
/*高温、低温---选择放入测输出流*/
if(sensorReading.getTemperature()>37){
context.output(highTemp,sensorReading);
}else if(sensorReading.getTemperature()<36){
context.output(lowTemp,sensorReading);
}else {
/*正常温度---选择主流*/
collector.collect(sensorReading);
}
}
}
}
以上是关于Flink---分流的主要内容,如果未能解决你的问题,请参考以下文章