编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java

Posted 虎鲸不是鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java相关的知识,希望对你有一定的参考价值。

编写Spark的UDF函数解决Hive表大数【bigint、double、float、decimal等】转字符串string时出现的科学计数法问题【Java】

背景

这是笔者在做平台开发时遇到的一个问题,当使用Spark对Hive表的数据做类型转换时,大数【bigint、double、float、decimal等】转字符串string时会出现科学计数法。举个栗子:

select cast(col1 as string) as col1_1

123456789.1234567891.23456789123456789E8

当col1是大数时,就会概率性出现科学计数法的问题,找了半天并没有找到任何参数可以关闭这种功能。

由于该Java类的功能是Hive2FTP定长双文件,最终会写一个Data文件到FTP,显然是不能接受这种科学计数法的。为了让文件的数据看起来和最原始的上游数据库、入湖Hive一致,必须想办法将转为科学计数法的数据再给转回去。

不得已,写了这个UDF函数,经试验,效果良好,可以实现预期功能。性能表现也还可以接受,3000w条数据处理时间大概2分钟【不包含提Yarn的等待时间】。

Java的Demo

话不多说,先上pom.xml依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>zhiyong_study</artifactId>
        <groupId>com.zhiyong</groupId>
        <version>1.0.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>spark_study</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <scala.version>2.12.12</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.3.0</spark.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>$scala.version</version>
        </dependency>

        <!-- 添加spark依赖 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_$scala.binary.version</artifactId>
            <version>$spark.version</version>
        </dependency>


        <!--        可以使用Lombok的@注解-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.20</version>
        </dependency>

        <!--        mysql驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>$project.build.sourceEncoding</encoding>-->
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

再上代码:

package com.zhiyong.demo.udf;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

/**
 * @program: zhiyong_study
 * @description: 使用Java编写UDF解决Spark大数转字符串时的科学计数法问题
 * @author: zhiyong
 * @create: 2022-10-31 20:23
 **/
public class Num2Str implements UDF1<Object, String> 
    @Override
    public String call(Object num) 
        String string = num.toString();
        boolean flg = false;
        if (string.contains("-")) 
            flg = true;
            string.replaceAll("-", "");
        
        String result = "123456789.123456789E5";
        StringBuilder strb = new StringBuilder();
        String sub1 = null;
        String sub2 = null;
        strb.delete(0, strb.length());

        String[] split = string.split("E");

        String num1 = split[0];
        int num2 = Integer.parseInt(split[1]);

        if (num2 > 0) //小数点右移
            if (num1.contains(".")) //有小数点
                String[] split2 = num1.split("\\\\.");//截断
                if (split2[1].length() > num2) //防止空指针
                    sub1 = split2[1].substring(0, num2);
                    sub2 = split2[1].substring(num2, split2[1].length());
                    strb.append(split2[0]).append(sub1).append(".").append(sub2);
                 else //补0
                    strb.append(split2[0]).append(split2[1]).append(".");
                    for (int i = 0; i < num2 - split2.length; i++) 
                        strb.append("0");
                    
                

             else //无小数点
                strb.append(split[0]);
                for (int i = 0; i < num2; i++) 
                    strb.append("0");
                
            

         else if (0 == num2) //不移动
            strb.append(split[0]);
         else //小数点左移
            num2 = -num2;
            strb.append("0");
            if (split[0].contains(".")) //有小数点
                String[] split2 = split[0].split("\\\\.");//截断
                for (int i = 0; i < num2 - 1; i++) 
                    strb.append("0");//占位
                
                strb.append(split[0]);
            
        

        result = strb.toString();

        if (flg) 
            result = "-" + result;
        

        /**
         * 不这么做肯定是有原因的
         */
        //result = new BigDecimal(num.toString()).toString();

        return result;
    

    public static void main(String[] args) throws Exception 
        SparkSession sc = SparkSession.builder()
                .appName("Num2Str")
                .master("local[*]")
                .enableHiveSupport()
                .getOrCreate();

        Dataset<Row> df1 = sc.sql("select col1 from db1.tb1");//获取到DataFrame

        Num2Str zhiyongUdf1 = new Num2Str();

        sc.udf().register("zhiyongUdf1", zhiyongUdf1, DataTypes.StringType);

        df1.createTempView("temp1");

        Dataset<Row> df2;

        /**
         * 写Hive表时无所谓,写文件时,有可能123456789.123456789→1.23456789123456789E8
         * 导致从文件看到的数据与Hive表不一致
         * cast(col1 as string) as col1_1 这种做法会遇到科学计数法的问题
         * 已知bigint、double、float、decimal等大数→string时会概率变化为科学计数法
         */
        if (true) 
            df2 = sc.sql("select zhiyongUdf1(col1) from temp1");
        

        df2.show();
        
        df2.write().text("hdfs://zhiyong1/temp1/day20221031");

        JavaRDD<Row> rowJavaRDD = df2.toJavaRDD();

        rowJavaRDD.saveAsTextFile("hdfs://zhiyong1/temp1/day20221031");

    

生产级代码脱敏后的内容大致如上。这样就可以获取到整形后的DataFrame,之后再存为文件就不会看到科学计数法了。

平台组件二开是个高危岗位,各种功能如果胆敢搞不出来就会被开除!!!能理解吧?

尾言

Spark官方是不建议写UDF的,因为UDF底层跑的是普通的JVM多线程任务,不会获得Spark的CataLyst优化,也不能享受到Spark自动托管内存的优势,性能表现较Spark自带的函数还是要差劲不少。实在是万不得已,才被迫出此下策。如果可以设置参数,其实性能表现要出色很多。

至于为何不用BigDecimal来转换,不便透露。

转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/127624005

以上是关于编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java的主要内容,如果未能解决你的问题,请参考以下文章

Hive入门--3.UDF编写与使用

大数据实战-Spark实战技巧

Hive从入门到精通8:Hive自定义函数(UDF)

在 hive 中的 udf 上加入两个表

hive 中简单的udf函数编写

Hive自定义UDF函数