使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java相关的知识,希望对你有一定的参考价值。
使用Spark的foreach算子及UDTF函数实现mysql数据的一对多【Java】
背景
我们的数仓项目中遇到了这样一种场景,脱敏后内容大致如下:
col1 | col2 | time1 | time2 |
---|---|---|---|
a1 | b1 | 2022-01-01 00:00:00 | 2022-01-05 00:00:00 |
a2 | b2 | 2022-01-28 00:00:00 | 2022-02-03 00:00:00 |
a3 | b3 | 2022-02-20 00:00:00 | 2022-02-25 00:00:00 |
a4 | b4 | 2022-03-29 00:00:00 | 2022-04-02 00:00:00 |
表结构大概如下:
mysql> create database db_lzy;
Query OK, 1 row affected (0.00 sec)
mysql> use db_lzy;
Database changed
mysql> show tables;
Empty set (0.00 sec)
mysql> create table test_origin_20001128 (
-> col1 varchar(200),
-> col2 varchar(200),
-> time1 datetime,
-> time2 datetime
-> )
-> ;
Query OK, 0 rows affected (0.03 sec)
mysql> create table test_result_20001128 (
-> col1 varchar(200),
-> col2 varchar(200),
-> time3 varchar(200)
-> )
-> ;
Query OK, 0 rows affected (0.01 sec)
mysql>
运算后的结果应该长这样:
col1 | col2 | time3 |
---|---|---|
a1 | b1 | 2022-01-01 |
a1 | b1 | 2022-01-02 |
a1 | b1 | 2022-01-03 |
a1 | b1 | 2022-01-04 |
a1 | b1 | 2022-01-05 |
a2 | b2 | 2022-01-28 |
a2 | b2 | 2022-01-29 |
a2 | b2 | 2022-01-30 |
a2 | b2 | 2022-01-31 |
a2 | b2 | 2022-02-01 |
a2 | b2 | 2022-02-02 |
a2 | b2 | 2022-02-03 |
a3 | b3 | 2022-02-20 |
a3 | b3 | 2022-02-21 |
a3 | b3 | 2022-02-22 |
a3 | b3 | 2022-02-23 |
a3 | b3 | 2022-02-24 |
a3 | b3 | 2022-02-25 |
a4 | b4 | 2022-03-29 |
a4 | b4 | 2022-03-30 |
a4 | b4 | 2022-03-31 |
a4 | b4 | 2022-04-01 |
a4 | b4 | 2022-04-02 |
显然应该使用一个UDTF函数。考虑到数据库应该尽可能存数据,而非消耗大量资源去运算,笔者最先想到的就是使用Spark的foreach算子实现该需求。运算应该尽可能放在Java这一侧,当然数据量不大时,也可以不用Spark,直接JDBC迭代器遍历一遍就OK了。数据量大时,还是应该使用Spark这类分布式运算引擎。
数据准备
简单插一些数据来模拟。实际prod环境当然不止这点数据!!!
mysql> insert into test_origin_20001128 values('a1','b1','2022-01-01 00:00:00','2022-01-01 00:00:00');
Query OK, 1 row affected (0.07 sec)
mysql> insert into test_origin_20001128 values('a2','b2','2022-01-28 00:00:00','2022-02-03 00:00:00');
Query OK, 1 row affected (0.01 sec)
mysql> insert into test_origin_20001128 values('a3','b3','2022-02-20 00:00:00','2022-02-25 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> insert into test_origin_20001128 values('a4','b4','2022-03-29 00:00:00','2022-04-02 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> select * from test_origin_20001128;
+------+------+---------------------+---------------------+
| col1 | col2 | time1 | time2 |
+------+------+---------------------+---------------------+
| a1 | b1 | 2022-01-01 00:00:00 | 2022-01-01 00:00:00 |
| a2 | b2 | 2022-01-28 00:00:00 | 2022-02-03 00:00:00 |
| a3 | b3 | 2022-02-20 00:00:00 | 2022-02-25 00:00:00 |
| a4 | b4 | 2022-03-29 00:00:00 | 2022-04-02 00:00:00 |
+------+------+---------------------+---------------------+
4 rows in set (0.00 sec)
mysql>
Java代码
pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.12</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.3.0</spark.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>$scala.version</version>
</dependency>
<!-- 添加spark依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<!-- 可以使用Lombok的@注解-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
<!-- MySQL驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>$project.build.sourceEncoding</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
Java
话不多说,直接上代码:
package com.zhiyong.day20221128;
import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.LinkedList;
import java.util.Properties;
/**
* @program: zhiyong_study
* @description: 使用foreach模拟UDTF函数
* @author: zhiyong
* @create: 2022-11-28 19:27
**/
public class ForeachDemo
public static void main(String[] args)
SparkSession spark = SparkSession.builder().appName("使用foreach模拟UDTF函数")
.master("local[2]")
.getOrCreate();
String url = "jdbc:mysql://192.168.88.100:3306/db_lzy";
String table = "test_origin_20001128";
Properties prop = new Properties();
prop.put("driver", "com.mysql.cj.jdbc.Driver");
prop.put("user", "root");
prop.put("password", "123456");
prop.put("url", url);
prop.put("fetchSize", "1000");
Dataset<Row> df1 = spark.read().jdbc(url, table, prop);
df1.show(false);
String[] exp = new String[5];
exp[0] = "col1 as col1";
exp[1] = "col2 as col2";
exp[2] = "date_format(time1,'yyyy-MM-dd') as time1";
exp[3] = "date_format(time2,'yyyy-MM-dd') as time2";
exp[4] = "datediff(date_format(time2,'yyyy-MM-dd'),date_format(time1,'yyyy-MM-dd')) as offset";
df1 = df1.selectExpr(exp);
df1.show(false);
df1.foreach((ForeachFunction<Row>) row ->
//生产环境应该用连接池
Class.forName("com.mysql.cj.jdbc.Driver");
Connection connection = DriverManager.getConnection(url, "root", "123456");
connection.setAutoCommit(false);
String col1 = row.get(0).toString();
String col2 = row.get(1).toString();
String time1 = row.get(2).toString();
int offset = Integer.parseInt(row.get(4).toString());
LinkedList<String> list = new LinkedList<>();
list.add(time1);
String sql = "insert into test_result_20001128 values(?,?,?)";
PreparedStatement prepareStatement = connection.prepareStatement(sql);
if (offset > 0)
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
Calendar calendar = Calendar.getInstance();
Date date1 = simpleDateFormat.parse(time1);
calendar.setTime(date1);
for (;offset>0;offset--)
calendar.add(calendar.DATE, 1);
String parseTime = simpleDateFormat.format(calendar.ge以上是关于使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java的主要内容,如果未能解决你的问题,请参考以下文章