从0到1Flink的成长之路(十六)

Posted 熊老二-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从0到1Flink的成长之路(十六)相关的知识,希望对你有一定的参考价值。

接上一篇继续

6. Sink 数据终端

api

6.1 基于控制台和文件的Sink

直接参考批处理的API即可,学习测试会使用,开发中更多的是数据实时处理统计分析完之后
存入mysql/Kafka/Redis/HBase…
在这里插入图片描述
案例演示:将词频统计结果数据存储至文本文件中,代码如下所示:

package xx.xxxxxx.flink.sink;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class StreamSinkFileDemo {
public static void main(String[] args) throws Exception {
// 1. 执行环境-env:流计算执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1) ;
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataSet函数,处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// a. 过滤数据
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
// b. 分割单词
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\\\W+");
for (String word : words) {
out.collect(word);
}
}
})
// c. 转换二元组,表示每个单词出现一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
})
// d. 按照单词分组及对组内聚合操作
.keyBy(0).sum(1);
// d. 数据终端-sink:数据终端-sink:保存至文件
resultDataStream
.setParallelism(1)
.writeAsText("datas/stream-output.txt", FileSystem.WriteMode.OVERWRITE);
// e. 执行应用-execute
env.execute(StreamSinkFileDemo.class.getSimpleName()) ;
}
}

6.2 自定义Sink:MySQL

需求:
将Flink集合中的数据通过自定义Sink保存到MySQL
代码实现:

package xx.xxxxx.flink.sink.mysql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* 案例演示:自定义Sink,将数据保存至MySQL表中,继承RichSinkFunction
*/
public class StreamSinkMySQLDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class Student{
private Integer id ;
private String name ;
private Integer age ;
}
/**
* 自定义Sink,将DataStream数据写入到外部存储MySQL数据库表中
*/
private static class MySQLSink extends RichSinkFunction<Student> {
private Connection conn = null;
private PreparedStatement pstmt = null;
// 计数
private Integer counter = 0 ;
@Override
public void open(Configuration parameters) throws Exception {
// 1. 加载驱动
Class.forName("com.mysql.jdbc.Driver");
// 2. 创建连接
conn = DriverManager.getConnection(
"jdbc:mysql://node1.itcast.cn:3306/?useUnicode=true&characterEncoding=utf-8&useSSL=false",
"root", "123456"
);
// 3. 创建PreparedStatement
pstmt = conn.prepareStatement("insert into db_flink.t_student (id, name, age) values (?, ?, ?)");
}
@Override
public void invoke(Student student, Context context) throws Exception {
try{
// 设置参数的值
pstmt.setInt(1, student.id);
pstmt.setString(2, student.name);
pstmt.setInt(3, student.age);
// 加入批次
pstmt.addBatch();
counter ++ ;
if(counter >= 10){
pstmt.executeBatch(); // 批量插入
counter = 0 ;
}
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
try{
if(counter > 0){
// 批量插入
pstmt.executeBatch();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(null != pstmt) pstmt.close();
if(null != conn) conn.close();
}
}
}
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<Student> inputDataStream = env.fromElements(
new Student(13, "wangwu", 20),
new Student(14, "zhaoliu", 19),
new Student(15, "wangwu", 20),
new Student(16, "zhaoliu", 19)
);
// 3. 数据终端-sink
inputDataStream.addSink(new MySQLSink());
// 4. 应用执行-execute
env.execute(StreamSinkMySQLDemo.class.getSimpleName());
}
}

此外,从Flink 1.11开始,提供JDBC Connector,更加方便保存数据至RDBMs表中,演示保存
数据MySQL数据库表中,代码如下所示:

package xx.xxxxx.flink.sink.mysql;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* Flink 流计算,官方自带Connector,将数据保存写入RDBMs数据库表中,比如MySQL表中
*/
public class StreamSinkJdbcDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class Student{
private Integer id ;
private String name ;
private Integer age ;
}
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<Student> studentDataStream = env.fromElements(
new Student(21, "wangwu3", 20),
new Student(22, "zhaoliu4", 19),
new Student(23, "wangwu5", 20),
new Student(24, "zhaoliu6", 19)
);
// 3. 数据终端-sink
studentDataStream.addSink(
JdbcSink.sink(
"insert into db_flink.t_student (id, name, age) values (?, ?, ?)", //
new JdbcStatementBuilder<Student>(){
@Override
public void accept(PreparedStatement pstmt,
Student student) throws SQLException {
pstmt.setInt(1, student.id);
pstmt.setString(2, student.name);
pstmt.setInt(3, student.age);
}
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://node1.itcast.cn:3306/")
.withUsername("root")
.withPassword("123456")
.build()
)
);
// 4. 触发执行-execute
env.execute(StreamSinkJdbcDemo.class.getSimpleName());
}
}

6.3 Kafka Sink

添加链接描述
需求:
将Flink集合中的数据通过自定义Sink保存到Kafka
代码实现

package xx.xxxxx.flink.sink.kafka;
import cn.itcast.flink.source.mysql.StreamSourceMySQLDemo;
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* 案例演示:将数据保存至Kafka Topic中,直接使用官方提供Connector
* /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic flink-topic
*/
public class StreamSinkKafkaDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
private static class Student{
private Integer id ;
private String name ;
private Integer age ;
}
/**
* 自定义KafkaSerializationSchema实现类
*/
private static class KafkaSchema implements KafkaSerializationSchema<String> {
private String topic ;
public KafkaSchema(String topicName){
this.topic = topicName ;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
topic, element.getBytes(StandardCharsets.UTF_8)
);
return record;
}
}
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 数据源-source
DataStreamSource<StreamSourceMySQLDemo.Student> studentDataStream = env.addSource(
new StreamSourceMySQLDemo.MySQLSource()
);
// 3. 数据转换-transformation
SingleOutputStreamOperator<String> jsonDataStream = studentDataStream.map(
new MapFunction<StreamSourceMySQLDemo.Student, String>() {
@Override
public String map(StreamSourceMySQLDemo.Student student) throws Exception {
return JSON.toJSONString(student);
}
}
);
// 4. 数据终端-sink
String topic = "flink-topic" ;
// a. Kafka 生产者配置属性
Properties props = new Properties();
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "node1.itcast.cn:9092");
// b. Kafka 数据序列化Schema信息x
KafkaSerializationSchema<String> kafkaSchema = new KafkaSchema(topic);
// c. 创建FlinkKafkaProducer对象
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
topic, //
kafkaSchema, //
props,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //
);
// d. 添加Sink
jsonDataStream.addSink(kafkaProducer);
// 5. 应用执行-execute
env.execute(StreamSinkKafkaDemo.class.getSimpleName());
}
}

6.4 Redis Sink

API
通过Flink 操作Redis 其实可以通过传统的Jedis 连接池JedisPool 进行Redis 的相关操作,但
是Flink 提供了专门操作Redis 的RedisSink,使用起来更方便,而且不用考虑性能的问题,接下来
将主要介绍RedisSink 如何使用。
redis
RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现
这个接口中的三个方法,如下所示
1.getCommandDescription() :
设置使用的Redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
2.String getKeyFromData(T data):
设置value 中的键值对key的值
3.String getValueFromData(T data);
设置value 中的键值对value的值
使用RedisCommand设置数据结构类型时和redis结构对应关系
在这里插入图片描述
可以连接到不同Redis环境(单机Redis服务、集群Redis服务及Sentinel Redis服务),配置
Config:在这里插入图片描述
需求
将Flink集合中的数据通过自定义Sink保存到Redis
代码实现

package xx.xxxxxx.flink.sink.redis;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
/**
* 案例演示:将数据保存至Redis中,直接使用官方提供Connector
* https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
*/
public class StreamSinkRedisDemo {
/**
* 自定义RedisMapper,实现其中三个方法,分别为命令、key和Value
*/
private static class StreamRedisMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "wordcount");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return Integer.toString(data.f1);
}
}
public static void main(String[] args) throws Exception {
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setParallelism(1);
// 2. 数据源-source:Socket接收数据
DataStreamSource<String> inputDataStream = env.socketTextStream("node1.itcast.cn", 9999);
// 3. 转换处理-transformation:调用DataSet函数,处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = inputDataStream
// a. 过滤数据
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String line) throws Exception {
return null != line && line.trim().length() > 0;
}
})
// b. 分割单词
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.trim().split("\\\\W+");
for (String word : words) {
out.collect(word);
}
}
})
// c. 转换二元组,表示每个单词出现一次
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
})
// d. 按照单词分组及对组内聚合操作
.keyBy(0).sum(1);
// 4. 数据终端-sink
// a. Redis 服务配置设置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("node1.itcast.cn")
.setPort(6379)
.setDatabase(0)
.setMinIdle(1)
.setMaxIdle(8)
.setMaxTotal(8)
.build();
// b. 创建RedisSink对象
RedisSink<Tuple2<String, Integer>> redisSink = new RedisSink<>(
config, new StreamRedisMapper()
) ;
// c. 添加Sink
resultDataStream.addSink(redisSink);
// 5. 触发执行
env.execute(StreamSinkRedisDemo.class.getSimpleName());
}
}

以上是关于从0到1Flink的成长之路(十六)的主要内容,如果未能解决你的问题,请参考以下文章

从0到1Flink的成长之路(十四)

从0到1Flink的成长之路(二十)-案例:时间会话窗口

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路

从0到1Flink的成长之路(十三)