2021年最新最全Flink系列教程_Flink原理初探和流批一体API
Posted ChinaManor
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年最新最全Flink系列教程_Flink原理初探和流批一体API相关的知识,希望对你有一定的参考价值。
day02_流批一体API
今日目标
- 流处理概念(理解)
- 程序结构之数据源Source(掌握)
- 程序结构之数据转换Transformation(掌握)
- 程序结构之数据落地Sink(掌握)
- Flink连接器Connectors(理解)
流处理概念
数据的时效性
-
强调的是数据的处理时效
网站的数据访问,被爬虫爬取
流处理和批处理
-
流处理是无界的
- 窗口操作来划分数据的边界进行计算
-
批处理是有界的
-
在Flink1.12时支持流批一体 既支持流处理也支持批处理。
编程模型
- source
- transformation
- sink
Source
基于File
- 需求env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
package cn.itcast.sz22.day02;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author itcast
* Date 2021/5/5 9:50
* env.readTextFile(本地/HDFS文件/文件夹);//压缩文件也可以
*/
public class FileSourceDemo {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取文件 hdfs://node1:8020/user/root/xxx.txt
//读取通过 gzip 压缩的 gz 文件
DataStreamSource<String> source1 = env.readTextFile("data/hello.txt");
DataStreamSource<String> source2 = env.readTextFile("D:\\\\_java_workspace\\\\sz22\\\\data\\\\hello.txt.gz");
//打印文本
source1.print();
source2.print("source2:");
//执行流环境
env.execute();
}
}
基于数据集合 fromElemet
-
需求
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.generateSequence(开始,结束);
4.env.fromSequence(开始,结束);
-
案例
package cn.itcast.sz22.day02; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; /** * Author itcast * Date 2021/5/5 9:20 * 1. 创建环境(流处理环境) * 2. 获取数据源 * 3. 打印数据 * 4. 执行 */ public class SourceDemo01 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.env.fromElements(可变参数); DataStreamSource<String> source1 = env.fromElements("hello world", "hello flink"); //2.env.fromColletion(各种集合); ArrayList list = new ArrayList(); list.add("hello hadoop"); list.add("hello flink"); DataStreamSource source2 = env.fromCollection(list); //3.env.generateSequence(开始,结束); DataStreamSource<Long> source3 = env.generateSequence(1, 10).setParallelism(1); //4.env.fromSequence(开始,结束); DataStreamSource<Long> source4 = env.fromSequence(10, 20); //打印输出 source1.print("source1"); source2.print("source2"); source3.print("source3"); source4.print("source4"); //执行环境 env.execute(); } }
custom
-
几种 SourceFunction
SourceFunction:非并行数据源(并行度只能=1)
RichSourceFunction:多功能非并行数据源(并行度只能=1)
ParallelSourceFunction:并行数据源(并行度能够>=1)
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1)–后续学习的Kafka数据源使用的
-
需求:每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
package cn.itcast.sz22.day02; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; import java.util.UUID; /** * Author itcast * Date 2021/5/5 10:15 * 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳) * 要求: * - 随机生成订单ID(UUID) * - 随机生成用户ID(0-2) * - 随机生成订单金额(0-100) * - 时间戳为当前系统时间 */ public class CustomSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //自定义 source Random rn = new Random(); DataStreamSource<Order> source = env.addSource(new ParallelSourceFunction<Order>() { boolean flag = true; //创建一个 标记 @Override public void run(SourceContext<Order> ctx) throws Exception { while (flag) { //随机生成订单ID(UUID) String oid = UUID.randomUUID().toString(); //随机生成用户ID(0-2) int uid = rn.nextInt(3); //随机生成订单金额(0-100) int money = rn.nextInt(101); //时间戳为当前系统时间 long timestamp = System.currentTimeMillis(); //将数据封装成 Order 收集数据 ctx.collect(new Order(oid, uid, money, timestamp)); //每一秒休息一次 Thread.sleep(1000); } } @Override public void cancel() { flag = false; } }).setParallelism(1); //打印输出 source.print(); env.execute(); } @Data @AllArgsConstructor @NoArgsConstructor public static class Order{ private String uuid; private int uid; private int money; private Long timestamp; } }
-
从mysql 中自定义数据源读取数据
- 初始化
CREATE DATABASE if not exists bigdata; USE bigdata; CREATE TABLE if not exists `t_student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8; INSERT INTO `t_student` VALUES ('1', 'jack', '18'); INSERT INTO `t_student` VALUES ('2', '张三', '19'); INSERT INTO `t_student` VALUES ('3', 'rose', '20'); INSERT INTO `t_student` VALUES ('4', 'tom', '19'); INSERT INTO `t_student` VALUES ('5', '李四', '18'); INSERT INTO `t_student` VALUES ('6', '王五', '20');
package cn.itcast.sz22.day02;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
/**
* Author itcast
* Date 2021/5/5 10:32
* Desc TODO
*/
public class MySQLSourceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.env 设置并行度为 1
env.setParallelism(1);
//2.source,创建连接MySQL数据源 数据源,每2秒钟生成一条数据
DataStreamSource<Student> source = env.addSource(new RichSourceFunction<Student>() {
Connection conn;
PreparedStatement ps;
boolean flag = true;
@Override
public void open(Configuration parameters) throws Exception {
//连接数据源
conn = DriverManager.getConnection("jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false"
, "root", "123456");
//编写读取数据表的sql
String sql = "select `id`,`name`,age from t_student";
//准备 preparestatement SQL
ps = conn.prepareStatement(sql);
}
@Override
public void run(SourceContext<Student> ctx) throws Exception {
while (flag) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
int id = rs.getInt("id");
String name = rs.getString("name");
int age = rs.getInt("age");
Student student = new Student(id, name, age);
ctx.collect(student);
}
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
ps.close();
conn.close();
}
});
//3.打印数据源
//4.执行
//创建静态内部类 Student ,字段为 id name age
//创建静态内部类 MySQLSource 继承RichParallelSourceFunction<Student>
// 实现 open 方法
// 获取数据库连接 mysql5.7版本
jdbc:mysql://192.168.88.100:3306/bigdata?useSSL=false
// 实现 run 方法
source.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Student {
private int id;
private String name;
private int age;
}
}
socket 套接字
-
安装 netcat yum install nc -y
-
需求:通过 socket 接收数据并做单词wordcount 统计
package cn.itcast.sz22.day02; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.Arrays; /** * Author itcast * Date 2021/5/5 9:59 * Desc TODO */ public class SocketSourceDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.source socketSource DataStreamSource<String> source = env.socketTextStream("192.168.88.100", 9999); //3.处理数据-transformation SingleOutputStreamOperator<Tuple2<String, Integer>> result = source .flatMap((String value, Collector<String> out) -> Arrays .stream(value.split(" ")).forEach(out::collect)) .returns(Types.STRING) .map(value -> Tuple2.of(value, 1)) .returns(Types.TUPLE(Types.STRING, Types.INT)) .keyBy(t -> t.f0) .sum(1); //3.1每一行数据按照空格切分成一个个的单词组成一个集合 //3.2对集合中的每个单词记为1 //3.3对数据按照单词(key)进行分组 //3.4对各个组内的数据按照数量(value)进行聚合就是求sum //4.输出结果-sink result.print(); //5.触发执行-execute env.execute(); } }
Transformation
转换操作的数据操作类型
- 对于单条数据的处理 map filter
- 对于多条数据,window窗口内的数据处理 reduce
- 合流 union join 将多个流合并到一起
- 分流 将一个数据流分成多个数据流 spit或 outputTag
案例
- 对流数据中的单词进行统计,排除敏感词heihei
package cn.itcast.sz22.day02;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Author itcast
* Date 2021/5/5 9:59
* 1.filter过滤 将单词中 heihei 单词过滤掉
* 2.reduce聚合
*/
public class SocketSourceFilterDemo {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.source socketSource
DataStreamSource<String> source = env.socketTextStream("192.168.88.100", 9998);
//3.处理数据-transformation
SingleOutputStreamOperator<Tuple2<String, Integer>> result = source
.flatMap((String value, Collector<String> out) -> Arrays
.stream(value.split(" ")).forEach(out::collect))
.returns(Types.STRING)
//过滤掉 包含 heihei 单词的所有信息 boolean filter(T value)
.filter(word-> !word.equals("heihei"))
.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(t -> t.f0)
//.sum(1);
//T reduce(T value1, T value2)
// hadoop,1 hadoop,1 => hadoop,1+1
.reduce((Tuple2<String,Integer> a,Tuple2<String,Integer> b)->Tuple2.of(a.f0,a.f1+b.f1));
//3.1每一行数据按照空格切分成一个个的单词组成一个集合
//3.2对集合中的每个单词记为1
//3.3对数据按照单词(key)进行分组
//3.4对各个组内的数据按照数量(value)进行聚合就是求sum
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute()2021年最新最全Flink系列教程__Flink综合案例
2021年最新最全Flink系列教程__Flink高级API
2021年最新最全Flink系列教程_Flink流批一体API
2021年最新最全Flink系列教程_Flink原理初探和流批一体API