Flink 实时写MySQL

Posted IT追梦猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 实时写MySQL相关的知识,希望对你有一定的参考价值。

一、mysql建库建表


建立数据库

CREATE DATABASE IF NOT EXISTS pty DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_chinese_ci;

建立数据表

#新建数据库CREATE TABLE `zzx_text_flink` ( `id` int(0) NOT NULL AUTO_INCREMENT COMMENT 'ID', `user_id` int(0) NULL DEFAULT NULL COMMENT '用户ID', `item_id` int(0) NULL DEFAULT NULL COMMENT '商品ID', `category_id` int(0) NULL DEFAULT NULL COMMENT '商品类目ID', `behavior` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '行为类型', `timestamp` bigint(0) NULL DEFAULT NULL COMMENT '时间戳', `input_stamp` bigint(0) NULL DEFAULT NULL COMMENT '写入时间戳', PRIMARY KEY (`id`) USING BTREE) ENGINE = InnoDB AUTO_INCREMENT = 217430 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;

二、新建项目工程

1、利用idea 新建maven 项目


这里不详细说明,如有需要请留言


2、pom文件中添加依赖


代码如下(示例):

# 全局设置各依赖版本<properties> <!--scala 版本--> <scala.binary.version>2.11</scala.binary.version> <!--flink 版本--> <flink.version>1.9.1</flink.version> <!--java 版本--> <java.version>1.8.0</java.version> <!--mysql 版本--> <mysql.version>8.0.21</mysql.version> <!--es 版本--> <es.version>7.9.3</es.version></properties>

#添加Flink 和 Mysql 依赖 </properties> <dependencies> <!--Flink 依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.12</artifactId> <version>${flink.version}</version> </dependency> <!--mysql 依赖--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> #lombok依赖 <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> </dependency></dependencies>


Lombok:为了代码美观,可读性高可以代替
POJO的getter/setter/toString;异常处理;I/O流的关闭操作
可以下去研究一下,有问题留言


3、建立POJO类


import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;
@Data@AllArgsConstructor@NoArgsConstructor*/public class Pty { private int id; //表ID private int user_id; //用户ID private int item_id; //商品ID private int category; //商品类目ID private String behavior; //行为类型 private long timestamp; //时间戳 private long input_stamp; //写入时间戳


4、建立配置文件类


在resource文件夹下新建 pty.properties

# JDBC Configurationmysql.driverClassName = com.mysql.jdbc.Drivermysql.url = jdbc:mysql://127.0.0.1:3306/pty?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTCmysql.userName = rootmysql.password = 123123


5、整体代码及片段解读


整体代码如下:

public class SinkMysql { public static void main(String[] args) throws Exception{ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String file_path = "src/main/data/UserBehavior.csv"; //设置并行度 env.setParallelism(5);
InputStream in = SinkMysql.class.getClassLoader().getResourceAsStream("pty.properties"); Properties prop = new Properties(); prop.load(in); String driver = prop.getProperty("mysql.driverClassName"); String url = prop.getProperty("mysql.url"); String user = prop.getProperty("mysql.userName"); String passwd = prop.getProperty("mysql.password");// Long input_stamp = System.currentTimeMillis(); DataSet<Row> write = env.readCsvFile(file_path).fieldDelimiter(',').types(Integer.class, Integer.class, Integer.class, String.class, Long.class) .map(new MapFunction<Tuple5<Integer, Integer, Integer, String, Long>, Row>() { public Row map(Tuple5<Integer, Integer, Integer, String, Long> Tuple5) throws Exception { Row row = new Row(5); row.setField(0, Tuple5.f0); row.setField(1, Tuple5.f1); row.setField(2, Tuple5.f2); row.setField(3, Tuple5.f3); row.setField(4, Tuple5.f4); return row; } }); //写出到MySQL 设置配置参数 write.output( JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(passwd) .setQuery("insert into zzx_text_flink (user_id,item_id,category_id,behavior,timestamp,input_stamp) values (?,?,?,?,?,unix_timestamp())") . finish() ); env.execute(); System.out.println("数据写入成功");


片段解读

#创建流环境、读取测试数据文件,设置并行度ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String file_path = "src/main/data/UserBehavior.csv"; //设置并行度 env.setParallelism(5);

读取配置文件参数

InputStream in = SinkMysql.class.getClassLoader().getResourceAsStream("pty.properties"); Properties prop = new Properties(); prop.load(in); String driver = prop.getProperty("mysql.driverClassName"); String url = prop.getProperty("mysql.url"); String user = prop.getProperty("mysql.userName"); String passwd = prop.getProperty("mysql.password");

编辑自己的业务逻辑

DataSet<Row> write = env.readCsvFile(file_path).fieldDelimiter(',').types(Integer.class, Integer.class, Integer.class, String.class, Long.class) .map(new MapFunction<Tuple5<Integer, Integer, Integer, String, Long>, Row>() { public Row map(Tuple5<Integer, Integer, Integer, String, Long> Tuple5) throws Exception { Row row = new Row(5); row.setField(0, Tuple5.f0); row.setField(1, Tuple5.f1); row.setField(2, Tuple5.f2); row.setField(3, Tuple5.f3); row.setField(4, Tuple5.f4); return row; } });

写出到mysql 并打印成功结果

write.output( JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername(driver) .setDBUrl(url) .setUsername(user) .setPassword(passwd) .setQuery("insert into zzx_text_flink (user_id,item_id,category_id,behavior,timestamp,input_stamp) values (?,?,?,?,?,unix_timestamp())") . finish() ); env.execute(); System.out.println("数据写入成功");

样例数据格式



以上是关于Flink 实时写MySQL的主要内容,如果未能解决你的问题,请参考以下文章

flink1.12 sql向redis实时写数据

2021年大数据Flink(四十):​​​​​​​Flink模拟双十一实时大屏统计

实时即未来,大数据项目车联网之创建Flink实时计算子工程

实时即未来,大数据项目车联网之创建Flink实时计算子工程

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi

Flink 实战系列Flink CDC 实时同步 Mysql 全量加增量数据到 Hudi