Flink 实时写MySQL
Posted IT追梦猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 实时写MySQL相关的知识,希望对你有一定的参考价值。
一、mysql建库建表
建立数据库
CREATE DATABASE IF NOT EXISTS pty
DEFAULT CHARACTER SET utf8
DEFAULT COLLATE utf8_chinese_ci;
建立数据表
#新建数据库
CREATE TABLE `zzx_text_flink` (
`id` int(0) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` int(0) NULL DEFAULT NULL COMMENT '用户ID',
`item_id` int(0) NULL DEFAULT NULL COMMENT '商品ID',
`category_id` int(0) NULL DEFAULT NULL COMMENT '商品类目ID',
`behavior` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '行为类型',
`timestamp` bigint(0) NULL DEFAULT NULL COMMENT '时间戳',
`input_stamp` bigint(0) NULL DEFAULT NULL COMMENT '写入时间戳',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 217430 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
二、新建项目工程
1、利用idea 新建maven 项目
这里不详细说明,如有需要请留言
2、pom文件中添加依赖
代码如下(示例):
# 全局设置各依赖版本
<properties>
<!--scala 版本-->
<scala.binary.version>2.11</scala.binary.version>
<!--flink 版本-->
<flink.version>1.9.1</flink.version>
<!--java 版本-->
<java.version>1.8.0</java.version>
<!--mysql 版本-->
<mysql.version>8.0.21</mysql.version>
<!--es 版本-->
<es.version>7.9.3</es.version>
</properties>
#添加Flink 和 Mysql 依赖
</properties>
<dependencies>
<!--Flink 依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql 依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
#lombok依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
</dependencies>
Lombok:为了代码美观,可读性高可以代替
POJO的getter/setter/toString;异常处理;I/O流的关闭操作
可以下去研究一下,有问题留言
3、建立POJO类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
*/
public class Pty {
private int id; //表ID
private int user_id; //用户ID
private int item_id; //商品ID
private int category; //商品类目ID
private String behavior; //行为类型
private long timestamp; //时间戳
private long input_stamp; //写入时间戳
4、建立配置文件类
在resource文件夹下新建 pty.properties
# JDBC Configuration
mysql.driverClassName = com.mysql.jdbc.Driver
mysql.url = jdbc:mysql://127.0.0.1:3306/pty?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
mysql.userName = root
mysql.password = 123123
5、整体代码及片段解读
整体代码如下:
public class SinkMysql {
public static void main(String[] args) throws Exception{
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String file_path = "src/main/data/UserBehavior.csv";
//设置并行度
env.setParallelism(5);
InputStream in = SinkMysql.class.getClassLoader().getResourceAsStream("pty.properties");
Properties prop = new Properties();
prop.load(in);
String driver = prop.getProperty("mysql.driverClassName");
String url = prop.getProperty("mysql.url");
String user = prop.getProperty("mysql.userName");
String passwd = prop.getProperty("mysql.password");
// Long input_stamp = System.currentTimeMillis();
DataSet<Row> write = env.readCsvFile(file_path).fieldDelimiter(',').types(Integer.class, Integer.class, Integer.class, String.class, Long.class)
.map(new MapFunction<Tuple5<Integer, Integer, Integer, String, Long>, Row>() {
public Row map(Tuple5<Integer, Integer, Integer, String, Long> Tuple5) throws Exception {
Row row = new Row(5);
row.setField(0, Tuple5.f0);
row.setField(1, Tuple5.f1);
row.setField(2, Tuple5.f2);
row.setField(3, Tuple5.f3);
row.setField(4, Tuple5.f4);
return row;
}
});
//写出到MySQL 设置配置参数
write.output(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(user)
.setPassword(passwd)
.setQuery("insert into zzx_text_flink (user_id,item_id,category_id,behavior,timestamp,input_stamp) values (?,?,?,?,?,unix_timestamp())")
. finish()
);
env.execute();
System.out.println("数据写入成功");
片段解读
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String file_path = "src/main/data/UserBehavior.csv";
//设置并行度
env.setParallelism(5);
读取配置文件参数
InputStream in = SinkMysql.class.getClassLoader().getResourceAsStream("pty.properties");
Properties prop = new Properties();
prop.load(in);
String driver = prop.getProperty("mysql.driverClassName");
String url = prop.getProperty("mysql.url");
String user = prop.getProperty("mysql.userName");
String passwd = prop.getProperty("mysql.password");
编辑自己的业务逻辑
DataSet<Row> write = env.readCsvFile(file_path).fieldDelimiter(',').types(Integer.class, Integer.class, Integer.class, String.class, Long.class)
.map(new MapFunction<Tuple5<Integer, Integer, Integer, String, Long>, Row>() {
public Row map(Tuple5<Integer, Integer, Integer, String, Long> Tuple5) throws Exception {
Row row = new Row(5);
row.setField(0, Tuple5.f0);
row.setField(1, Tuple5.f1);
row.setField(2, Tuple5.f2);
row.setField(3, Tuple5.f3);
row.setField(4, Tuple5.f4);
return row;
}
});
写出到mysql 并打印成功结果
write.output(
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(driver)
.setDBUrl(url)
.setUsername(user)
.setPassword(passwd)
.setQuery("insert into zzx_text_flink (user_id,item_id,category_id,behavior,timestamp,input_stamp) values (?,?,?,?,?,unix_timestamp())")
. finish()
);
env.execute();
System.out.println("数据写入成功");
样例数据格式
以上是关于Flink 实时写MySQL的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Flink(四十):Flink模拟双十一实时大屏统计