Flinkflink流批一体
Posted 星欲冷hx
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flinkflink流批一体相关的知识,希望对你有一定的参考价值。
目录
流处理的相关概念
数据的时效性
日常工作中,我们一般会先把数据存放在表中,然后对数据进行加工和分析。这就有时效性的问题。
如果以年、月、天为单位的级别的数据处理,进行统计分析,个性化推荐,这样的数据一般称之为批数据。如果以小时、分钟、秒这样的单位进行处理,这样的数据一般称之为流数据。比如对网站的实时监控、日志的处理等。在这样的场景下,如果还要收集数据存储到数据库中,再取出来统一进行处理,就无法满足高时效性的需求。
流处理和批处理
批处理
Batch Analytics,统一收集数据->存储到数据库->对数据进行批量处理,例如MP、Hive、FlinkDataSet、Spark Batch等,生成离线报表
流处理
Streaming Analytics,就是对数据流进行处理,例如Storm、Flink Stream、Spark Streaming实时处理分析数据,应用场景如实时大屏、实时报表等。
批处理和流处理的对比
- 数据的时效性不同:流处理实时、低延迟,批处理非实时、高延迟
- 数据特征不同:流处理的数据一般是动态的、没有边界,而批处理的数据一般是静态的
- 应用场景不同:流处理应用在实时场景,批处理对实时性要求不高
- 运行方式不同:流处理的任务是持续进行,批处理的任务是一次性完成
流批一体API
Flink的DataStream API既支持批处理模式,又支持流处理模式,可以认为批处理是流处理的一种特例。
流批一体的好处:
- 可复用性:作业可以在流、批两种模式之间自由切换,而无需重写代码。
- 维护简单:统一的api,维护简单
流批一体编程模型
Data Source
预定义的Source
基于集合的Source
api
- env.fromElements(可变参数)
- env.fromCollection(各种集合)
- env.generateSequence(开始,结束)
- env.fromSequence(开始,结束)
演示
package DataStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* description flink的基于集合的source演示
* env.fromElements(可变参数)
* env.fromCollection(各种集合)
* env.generateSequence(开始,结束)
* env.fromSequence(开始,结束)
* @date 2022/4/25
*/
public class DataSourceCollectionDemo
public static void main(String[] args) throws Exception
//1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2、source
//2.1 env.fromElements()
DataStream<String> ds1 = env.fromElements("spark","flink","hadoop","hive");
//2.2 env.fromCollection
String [] s1 = "java","flume","azkaban","sqoop";
DataStream<String> ds2 = env.fromCollection(Arrays.asList(s1));
//2.3 env.generateSequence
DataStream<Long> ds3 = env.generateSequence(1,10);
//2.4 env.fromSequence
DataStream<Long> ds4 = env.fromSequence(20,30);
//3、transformation
//4、sink
ds1.print();
ds2.print();
ds3.print();
ds4.print();
//5、execute
env.execute();
输出结果
基于文件的Source
api
env.readTextFile(本地/HDFS/文件夹/压缩文件)
演示
package DataStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* description flink的基于文件的source演示
* env.readTextFile(本地/HDFS/文件夹/压缩文件)
* @date 2022/4/25
*/
public class DataSourceFileDemo
public static void main(String[] args) throws Exception
//1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2、source
//2.1 读取本地文件
DataStream<String> ds1 = env.readTextFile("D:\\\\wordcount\\\\input\\\\a.txt");
//2.2 读取本地文件夹
// DataStream<String> ds2 = env.readTextFile("F:\\\\0操作地址\\\\wordcount\\\\input");
//2.3 读取hdfs文件
// DataStream<String> ds3 = env.readTextFile("hdfs://hadoop01:9000/wordcount/input/a.txt");
//2.4 读取hdfs文件夹
// DataStream<String> ds4 = env.readTextFile("hdfs://hadoop01:9000/wordcount/input");
//2.5 读取压缩文件
// DataStream<String> ds5 = env.readTextFile("F:\\\\0操作地址\\\\wordcount\\\\a.txt.gz");
//2.6
// DataStream<String> ds6 = env.readTextFile("hdfs://hadoop01:9000/wordcount/input/a.txt.gz");
//3、transformation
//4、sink
ds1.print();
// ds2.print();
// ds3.print();
// ds4.print();
// ds5.print();
// ds6.print();
//5、execute
env.execute();
准备gz格式压缩文件
https://pan.baidu.com/s/1Nv8cIICk4HwNwdsdgkoFjA?pwd=1234 提取码:1234
查看结果
截取其中的一个
基于socket的source
socket是指网络通讯,需要有一个发送端一个接送端,类似于插头和插座(socket),用于和一些智能硬件的对接。比如门禁的人脸机就是一个智能设备,每个人脸机都有一个ip地址,也有一个端口,根据ip地址和端口号就可以和这个人脸机进行通讯。
1 模拟socket通讯,安装nc(没有四级标题了,就用符号了)
nc是netcat的简称,可以利用它向某台主机的某个端口发送数据,模拟socket通讯的发送端,也就是作为source
2 启动nc,发送数据,相当于socket通讯的发送端
3 使用telnet来接收数据,测试socket是否工作正常
linux主机下,也可以安装telnet进行测试
4 编写flink代码,作为socket通讯的接收端,接收发送的数据进行处理
package DataStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataSourceSocketDemo
public static void main(String[] args) throws Exception
//1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2、source
//2.1 读取socket数据,主机位hadoop001,端口号为9999
DataStream<String> ds1 = env.socketTextStream("hadoop001",9999);
//3、transformation
//4、sink
ds1.print();
// ds2.print();
// ds3.print();
// ds4.print();
// ds5.print();
// ds6.print();
//5、execute
env.execute();
启动测试
5 案例-利用基于socket的source实时统计单词数量
package DataStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* description flink的基于socket的source实时统计单词数量
* @date 2022/4/25
*/
public class DataSourceSocketWordCount
public static void main(String[] args) throws Exception
//1、env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2、source
//2.1 读取socket数据,主机位hadoop001,端口号为9999
DataStream<String> lineDS = env.socketTextStream("hadoop001",9999);
//3、transformation
//3、处理数据-transformation
//3.1将每一行数据切分为一个个的单词组成的一个集合
DataStream<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>()
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
//参数s就是一行行的数据,再将每一行切分为一个个的单词
String[] words = s.split(" ");
//将切分的单词收集起来,发到集合中
for (String word:words)
collector.collect(word);
);
//3.2对集合中的每一个单词记为1,成为一个二元组的集合
DataStream<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>()
@Override
public Tuple2<String, Integer> map(String s) throws Exception
//此处的s就是进来的一个个单词,再组成一个二元组返回
return Tuple2.of(s,1);
);
//3.3对新的集合按照key(单词名称)进行分组keyby
KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOneDS.keyBy(t->t.f0);//此处的f0代表二元组的第一个元素(索引为0)t0
//3.4对各个组内的数据按照value进行聚合,也就是求和sum
DataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);//此处的1代表二元组的第二个元素(索引为1)t1
//4、sink
result.print();
//5、execute
env.execute();
启动测试
自定义的source
随机生成数据
api
SourceFunction:非并行的随机数据源(并行度为1)
RichSourceFunction:丰富的非并行的随机数据源(并行度为1)
ParallelSourceFunction:并行的随机数据源(并行度可以大于等于1)
RichParallelSourceFunction:丰富的并行的随机数据源(并行度可以大于等于1)
需求
每隔1秒随机生成一条订单信息(订单ID,用户ID,订单金额,时间戳)
要求:
- 随机生成订单ID(UUID):UUID 是 通用唯一识别码(Universally Unique Identifier)的缩写
- 随机生成用户ID(0-2)
- 随机生成订单金额(0-100)
- 时间戳为当前系统时间
编程实现
引入lombok依赖,使用注解@Data自动生成getter和setter
启动查看结果
mysql
从mysql中提取数据进行处理
- sqoop:数据迁移命令行工具,可以把mysql中的数据迁移到hdfs、hive
- kettle:ETL可视化的工具,也可以把mysql中的数据抽取到hdfs、hive
- spark:api方式,读取mysql中的数据,并进行处理
- flink:api方式,读取mysql中的数据,并进行处理
需求
从mysql中实时的加载数据,显示出来
启动mysql
本地mysql
远程mysql
准备数据
远程的hadoop001上的mysql
新建数据库
新建表
设置字段
添加记录
代码实现
package cn.edu.hgu.flink.DataStream.model;
import cn.edu.hgu.flink.DataStream.model.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.*;
import java.util.concurrent.TimeUnit;
public class DataSourceMySQLDemo
public static void main(String[] args) throws Exception
//1. env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//2. source
//2.1添加自定义的source,连接mysql,并设置并行度为2
DataStream<Student> ds1 = env.addSource(new StudentMysqlSource()).setParallelism(2);
// 3. transformation
// 4. sink
ds1.print();
// 5. execute
env.execute();
private static class StudentMysqlSource extends RichParallelSourceFunction<Student>
// 定义一个标识,是否需要生成数据
private Boolean flag = true;
private Connection cn = null;
private Statement st = null;
private ResultSet rs = null;
//建立连接
@Override
public void open(Configuration parameters) throws Exception
// 从mysql中读取数据,创建Student对象,添加到上下文,作为Source
// 1. 加载mysql驱动
Class.forName("com.mysql.jdbc.Driver");
// 2. 获取连接Connection
String url = "jdbc:mysql://hadoop001:3306/studentdb?useSSL=false";
String user = "root";
String password = "Root123456!";
cn = DriverManager.getConnection(url,user,password);
// 3 获取执行语句对象Statement
st = cn.createStatement();
// 4. 数据集ResultSet
// 业务处理
@Override
public void run(SourceContext<Student> sourceContext) throws Exception
while (flag)
String sql = "select * from students";
rs = st.executeQuery(sql);
// 5 读取数据集
while (rs.next())
int id = rs.getInt("id");
String name = rs.getString("name");
int age = rs.getInt("age");
sourceContext.collect(new Student(id,name,age));
TimeUnit.SECONDS.sleep(5);//每隔五秒
@Override
public void cancel()
flag = false;
// 关闭资源
@Override
public void close() throws Exception
// 6 关闭
rs.close();
st.close();
cn.close();
在pom文件中添加java连接mysql的依赖
我这mysql是5.7.36的
但他要的是连接的那个包的版本
这个在以前给过好几次了
改成46
运行,查看结果
Transformations
官网api地址
Apache Flink 1.12 Documentation: Operators
整体分类
对单条记录的操作
filter过滤,map映射
对多条记录的操作
统计一个小时内的订单的总成交量,需要使用window将需要的记录关联到一起进行处理
对多个流进行操作并转换为单个流(合并)
union联合、join连接、connection连接
把一个流拆分成多个流(拆分)
split拆分,拆分后的每个流是原来流的有一个子集,可以对每个子集进行不同的操作
基本操作
map
映射,将操作作用在集合中的每一个元素上,并返回作用后的结果
y = f(x)
faltMap
扁平化映射,将集合中的每个元素转换为一个或多个元素,并返回扁平化之后的结果
keyBy
分组,安装指定的key来对流中的数据进行分组,注意在流处理中没有groupBy,而且keyBy
filter
过滤,按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
sum
求和,按照指定的字段对集合中的元素进行求和
reduce
聚合,对集合中的元素进行聚合
合并-拆分
一 union和connect
union算子可以合并多个同类型的数据流,并生成一个同类型的数据流,数据按照先进先出(First In First out)的模式合并,并且不去重。
connect提供了和union类似的功能,用来连接两个数据流,,但是与union区别:
- connect只能连接两个数据流,而union可以连接多个数据流
- connect连接的两个数据流的数据类型可以不一致,union所连接的数据流的数据类型必须一致
- 两个DataStream经过connect之后被转换为一个ConnectedStreams,并且会对两个流的数据应用不同的处理方法,而且双流之间可以共享状态。
需求
将两个String类型的流进行union
将一个String类型的流和一个Long类型的流信息connect
代码实现
package cn.edu.hgu.flink.DataStream.model;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import java.util.Arrays;
public class TransformationUnionAndConnectDemo
public static void main(String[] args) throws Exception
//1. env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. source
// 2.1 env.env.fromElements()
DataStream<String> ds1 = env.fromElements("spark","flink","hadoop","hive");
// 2.2 env.fromCollection()
String [] s1 = "java","flume","azkaban","sqoop";
DataStream<String> ds2 = env.fromCollection(Arrays.asList(s1));
// 2.3 env.generateSequence()
//
// 2.4
DataStream<Long> ds4 = env.fromSequence(20,30);
// 3. transformation
// union
DataStream<String> result1 = ds1.union(ds2);
// connect
ConnectedStreams<String,Long> result2 = ds1.connect(ds4);
DataStream<String> result3 = result2.map(new CoMapFunction<String, Long, String>()
@Override
public String map1(String s) throws Exception
return s;
@Override
public String map2(Long aLong) throws Exception
return aLong.toString();
);
// 4. sink
result3.print();
// 5. execute
env.execute();
输出结果
二 Select和Side Outputs
Select就是获取分流后对应的数据
Side Outputs可以使用process方法对流中的数据进行处理,并针对不同的处理结果将数据收集到不同OutputTag中
需求
对流中的数据按照奇数和偶数进行分流,并获取分流后的数据
代码实现
package cn.edu.hgu.flink.DataStream.model;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class TransformationSideOutputsDemo
public static void main(String[] args) throws Exception
// 1. env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. source
DataStream<Long> ds = env.fromSequence(1,10);
// 3. transformation
// 3.1 定义两个输出标签
OutputTag<Long> tag_even = new OutputTag<Long>("偶数", TypeInformation.of(Long.class));
OutputTag<Long> tag_odd = new OutputTag<Long>("奇数",TypeInformation.of(Long.class));
// 3.2 对流中的数据进行打标签处理
SingleOutputStreamOperator<Long> tagResult = ds.process(new ProcessFunction<Long, Long>()
@Override
public void processElement(Long aLong, ProcessFunction<Long, Long>.Context context, Collector<Long> collector) throws Exception
if (aLong % 2 == 0)
// 给数据流中的偶数打标签
context.output(tag_even,aLong);
else
// 给数据流中的奇数打标签
context.output(tag_odd,aLong);
);
// 3.3 获取标记好的数据进行分流
// 获取偶数标签的流
DataStream<Long> evenResult = tagResult.getSideOutput(tag_even);
// 获取奇数标签的流
DataStream<Long> oddResult = tagResult.getSideOutput(tag_odd);
// 4. sink
oddResult.print();
// 5. execute
env.execute();
输出结果
分区
rebalance重平衡分区
类似与spark中的repartition,但是功能更强大,可以直接解决数据倾斜问题。
如果出现了上述的数据倾斜问题,解决方案就是rebalance将数据打散
代码实现
package cn.edu.hgu.flink.DataStream.model;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class TransformationRebalanceDemo
public static void main(String[] args) throws Exception
// 1. env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. source
DataStream<Long> ds = env.fromSequence(0,100);
// 3. transformation
// 3.1 过滤操作,有可能出现数据倾斜
DataStream<Long> filterDS = ds.filter(new FilterFunction<Long>()
@Override
public boolean filter(Long aLong) throws Exception
return aLong > 20;
);
//3.2 map操作,将数据转换为(分区编号/子任务编号,数据)二元组
DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>()
@Override
public Tuple2<Integer, Integer> map(Long aLong) throws Exception
//获取分区编号/子任务编号
int id = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(id,1);
).keyBy(t->t.f0).sum(1);
// 3.3 rebalance操作,对数据进行重平衡,然后再进行转换
DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>()
@Override
public Tuple2<Integer, Integer> map(Long aLong) throws Exception
int id = getRuntimeContext().getIndexOfThisSubtask();
return Tuple2.of(id,1);
).keyBy(t->t.f0).sum(1);
// 4. sink
result2.print();//再输出前进行了rebalance重分区平衡,解决了数据倾斜
// 5. execute
env.execute();
输出结果
其他分区
- global:全部发往第一个task
- broadcast:广播
- forward:上下游并发度一样时一对一发送
- suffle:随机均匀分配
- recale:本地轮流分配
- partitionCustom:自定义分区
Data Sink
预定义Sink
基于控制台和文件的Sink
- print:直接输出到控制台
- printToErr:直接输出到控制台,用红色
- writeAsText(本地/HDFS):输出到本地或者hdfs上,说明:如果并行度为1输出为文件,如果并行度>1,输出为文件夹,如果不加并行度,则使用默认的并行度8,输出也为文件夹
package cn.edu.hgu.flink;
import akka.io.dns.DnsProtocol;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Stream;
/**
* 使用Flink的DataStream实现单词计数
*/
public class WordCountDataStream
public static void main(String[] args) throws Exception
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// // 指定运行环境为流处理模式
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// //指定运行环境为批处理模式
// env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//指定运行模式为自动,既支持流模式也支持批处理模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. 准备数据-source
DataStream<String> lineDS = env.fromElements("hadoop flink hadoop java hbase",
"hadoop flink c ","hadoop hbase flink");
// 3. 处理数据-transformation
// 3.1 将每一行数据切分成一个个的单词组成一个集合
DataStream<String> wordDS = lineDS.flatMap(new FlatMapFunction<String, String>()
@Override
public void flatMap(String s, Collector<String> collector) throws Exception
// 参数s就是一行行的数据,再将每一行切分为一个个的单词
String[] words = s.split(" ");
// 将切分的单词收集起来,发到集合中
for (String word:words)
collector.collect(word);
);
// 3.2 对集合中的每一个单词记为1,成为一个二元组的集合
DataStream<Tuple2<String,Integer>> wordAndOneDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>()
@Override
public Tuple2<String, Integer> map(String s) throws Exception
// 此处的s就是进来的一个个单词,再跟1组成一个二元组返回
return Tuple2.of(s,1);
);
// 3.3 对新的集合按照key(单词名称)进行分组
KeyedStream<Tuple2<String,Integer>,String> groupedDS = wordAndOneDS.keyBy(t->t.f0);//此处的fo代表二元组的第一个元素(索引为0)t0
// 3.4 对各个组内的数据按照value进行聚合,也就是求和sum
DataStream<Tuple2<String,Integer>> result = groupedDS.sum(1);//此处的1代表二元组的第二个元素(索引为1)t1
// 3.5 对结果进行排序sort
//DataSet<Tuple2<String,Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
//4.输出结果-sink
// result.print();
result.printToErr();
// result.writeAsText("D:\\\\wordcount\\\\output\\\\count", FileSystem.WriteMode.OVERWRITE);
// result.writeAsText("D:\\\\wordcount\\\\output\\\\count1",FileSystem.WriteMode.OVERWRITE).setParallelism(2);
// result.writeAsText("D:\\\\wordcount\\\\output\\\\count2",FileSystem.WriteMode.OVERWRITE).setParallelism(1);
//5.触发执行
env.execute();//DataStream需要调用execute方法来执行
结果
自定义Sink
MySQL:可以把经过Flink处理的数据保存到MySQL中
主程序
package cn.edu.hgu.flink.DataStream.model;
import cn.edu.hgu.flink.DataStream.model.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class SinkMySQLDemo
public static void main(String[] args) throws Exception
// 1. env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2. source
// 2.1 添加自定义的source,连接mysql,并设置并行度为2
DataStream<Student> ds = env.fromElements(new Student(null,"Kate",19));
// 3. transformation
// 4. sink
ds.addSink(new MysqlSink());
// 5. execute
env.execute();
实体类
自定义数据下沉工具类
输出结果
Connector
官网api
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/jdbc.html
JDBC Connector
这是官网参考,到这里就结束吧。
Example usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) ->
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
,
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()));env.execute();
这下flink的流批一体就讲完了,说实话需要很长的时间来消化代码,当时我写的的时候用了快一个星期,希望大家能够耐心学习,这篇文章足足有1.8w+万字
下一篇(4)我们来介绍flink四大基石以及table api和sql。
以上是关于Flinkflink流批一体的主要内容,如果未能解决你的问题,请参考以下文章
2021年最新最全Flink系列教程_Flink原理初探和流批一体API