使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java相关的知识,希望对你有一定的参考价值。

使用Spark的foreach算子及UDTF函数实现mysql数据的一对多【Java】

背景

我们的数仓项目中遇到了这样一种场景,脱敏后内容大致如下:

col1col2time1time2
a1b12022-01-01 00:00:002022-01-05 00:00:00
a2b22022-01-28 00:00:002022-02-03 00:00:00
a3b32022-02-20 00:00:002022-02-25 00:00:00
a4b42022-03-29 00:00:002022-04-02 00:00:00

表结构大概如下:

mysql> create database db_lzy;
Query OK, 1 row affected (0.00 sec)

mysql> use db_lzy;
Database changed
mysql> show tables;
Empty set (0.00 sec)

mysql> create table test_origin_20001128 (
    -> col1 varchar(200),
    -> col2 varchar(200),
    -> time1 datetime,
    -> time2 datetime
    -> )
    -> ;
Query OK, 0 rows affected (0.03 sec)

mysql> create table test_result_20001128 (
    -> col1 varchar(200),
    -> col2 varchar(200),
    -> time3 varchar(200)
    -> )
    -> ;
Query OK, 0 rows affected (0.01 sec)

mysql>

运算后的结果应该长这样:

col1col2time3
a1b12022-01-01
a1b12022-01-02
a1b12022-01-03
a1b12022-01-04
a1b12022-01-05
a2b22022-01-28
a2b22022-01-29
a2b22022-01-30
a2b22022-01-31
a2b22022-02-01
a2b22022-02-02
a2b22022-02-03
a3b32022-02-20
a3b32022-02-21
a3b32022-02-22
a3b32022-02-23
a3b32022-02-24
a3b32022-02-25
a4b42022-03-29
a4b42022-03-30
a4b42022-03-31
a4b42022-04-01
a4b42022-04-02

显然应该使用一个UDTF函数。考虑到数据库应该尽可能存数据,而非消耗大量资源去运算,笔者最先想到的就是使用Spark的foreach算子实现该需求。运算应该尽可能放在Java这一侧,当然数据量不大时,也可以不用Spark,直接JDBC迭代器遍历一遍就OK了。数据量大时,还是应该使用Spark这类分布式运算引擎。

数据准备

简单插一些数据来模拟。实际prod环境当然不止这点数据!!!

mysql> insert into test_origin_20001128 values('a1','b1','2022-01-01 00:00:00','2022-01-01 00:00:00');
Query OK, 1 row affected (0.07 sec)

mysql> insert into test_origin_20001128 values('a2','b2','2022-01-28 00:00:00','2022-02-03 00:00:00');
Query OK, 1 row affected (0.01 sec)

mysql> insert into test_origin_20001128 values('a3','b3','2022-02-20 00:00:00','2022-02-25 00:00:00');
Query OK, 1 row affected (0.00 sec)

mysql> insert into test_origin_20001128 values('a4','b4','2022-03-29 00:00:00','2022-04-02 00:00:00');
Query OK, 1 row affected (0.00 sec)

mysql> select * from test_origin_20001128;
+------+------+---------------------+---------------------+
| col1 | col2 | time1               | time2               |
+------+------+---------------------+---------------------+
| a1   | b1   | 2022-01-01 00:00:00 | 2022-01-01 00:00:00 |
| a2   | b2   | 2022-01-28 00:00:00 | 2022-02-03 00:00:00 |
| a3   | b3   | 2022-02-20 00:00:00 | 2022-02-25 00:00:00 |
| a4   | b4   | 2022-03-29 00:00:00 | 2022-04-02 00:00:00 |
+------+------+---------------------+---------------------+
4 rows in set (0.00 sec)

mysql>

Java代码

pom.xml

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.3.0</spark.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>$scala.version</version>
        </dependency>

        <!-- 添加spark依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>


        <!--        可以使用Lombok的@注解-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
        </dependency>

        <!--        MySQL驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>$project.build.sourceEncoding</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Java

话不多说,直接上代码:

package com.zhiyong.day20221128;

import org.apache.spark.api.java.function.ForeachFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.LinkedList;
import java.util.Properties;

/**
 * @program: zhiyong_study
 * @description: 使用foreach模拟UDTF函数
 * @author: zhiyong
 * @create: 2022-11-28 19:27
 **/
public class ForeachDemo 
    public static void main(String[] args) 
        SparkSession spark = SparkSession.builder().appName("使用foreach模拟UDTF函数")
                .master("local[2]")
                .getOrCreate();

        String url = "jdbc:mysql://192.168.88.100:3306/db_lzy";
        String table = "test_origin_20001128";
        Properties prop = new Properties();
        prop.put("driver", "com.mysql.cj.jdbc.Driver");
        prop.put("user", "root");
        prop.put("password", "123456");
        prop.put("url", url);
        prop.put("fetchSize", "1000");

        Dataset<Row> df1 = spark.read().jdbc(url, table, prop);
        df1.show(false);

        String[] exp = new String[5];
        exp[0] = "col1 as col1";
        exp[1] = "col2 as col2";
        exp[2] = "date_format(time1,'yyyy-MM-dd') as time1";
        exp[3] = "date_format(time2,'yyyy-MM-dd') as time2";
        exp[4] = "datediff(date_format(time2,'yyyy-MM-dd'),date_format(time1,'yyyy-MM-dd')) as offset";
        df1 = df1.selectExpr(exp);

        df1.show(false);

        df1.foreach((ForeachFunction<Row>) row -> 

            //生产环境应该用连接池
            Class.forName("com.mysql.cj.jdbc.Driver");
            Connection connection = DriverManager.getConnection(url, "root", "123456");
            connection.setAutoCommit(false);

            String col1 = row.get(0).toString();
            String col2 = row.get(1).toString();
            String time1 = row.get(2).toString();
            int offset = Integer.parseInt(row.get(4).toString());
            LinkedList<String> list = new LinkedList<>();
            list.add(time1);

            String sql = "insert into test_result_20001128 values(?,?,?)";
            PreparedStatement prepareStatement = connection.prepareStatement(sql);

            if (offset > 0) 
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                Calendar calendar = Calendar.getInstance();

                Date date1 = simpleDateFormat.parse(time1);
                calendar.setTime(date1);
                for (;offset>0;offset--)
                    calendar.add(calendar.DATE, 1);
                    String parseTime = simpleDateFormat.format(calendar.ge

以上是关于使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java的主要内容,如果未能解决你的问题,请参考以下文章

使用Spark的foreach算子及UDTF函数实现MySQL数据的一对多Java

spark算子 分为3大类

spark 能执行udf 不能执行udaf,啥原因

Spark RDD-行动算子

Spark算子: combineByKey 简单解析及案列

Spark mapPartitions 及mapPartitionsWithIndex算子