编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java
Posted 虎鲸不是鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java相关的知识,希望对你有一定的参考价值。
编写Spark的UDF函数解决Hive表大数【bigint、double、float、decimal等】转字符串string时出现的科学计数法问题【Java】
背景
这是笔者在做平台开发时遇到的一个问题,当使用Spark对Hive表的数据做类型转换时,大数【bigint、double、float、decimal等】转字符串string时会出现科学计数法。举个栗子:
select cast(col1 as string) as col1_1
123456789.123456789→1.23456789123456789E8
当col1是大数时,就会概率性出现科学计数法的问题,找了半天并没有找到任何参数可以关闭这种功能。
由于该Java类的功能是Hive2FTP定长双文件,最终会写一个Data文件到FTP,显然是不能接受这种科学计数法的。为了让文件的数据看起来和最原始的上游数据库、入湖Hive一致,必须想办法将转为科学计数法的数据再给转回去。
不得已,写了这个UDF函数,经试验,效果良好,可以实现预期功能。性能表现也还可以接受,3000w条数据处理时间大概2分钟【不包含提Yarn的等待时间】。
Java的Demo
话不多说,先上pom.xml
依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>zhiyong_study</artifactId>
<groupId>com.zhiyong</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark_study</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<scala.version>2.12.12</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.3.0</spark.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>$scala.version</version>
</dependency>
<!-- 添加spark依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_$scala.binary.version</artifactId>
<version>$spark.version</version>
</dependency>
<!-- 可以使用Lombok的@注解-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
</dependency>
<!-- mysql驱动包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>$project.build.sourceEncoding</encoding>-->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<!--<mainClass>com.aa.flink.StreamWordCount</mainClass>-->
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
再上代码:
package com.zhiyong.demo.udf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;
/**
* @program: zhiyong_study
* @description: 使用Java编写UDF解决Spark大数转字符串时的科学计数法问题
* @author: zhiyong
* @create: 2022-10-31 20:23
**/
public class Num2Str implements UDF1<Object, String>
@Override
public String call(Object num)
String string = num.toString();
boolean flg = false;
if (string.contains("-"))
flg = true;
string.replaceAll("-", "");
String result = "123456789.123456789E5";
StringBuilder strb = new StringBuilder();
String sub1 = null;
String sub2 = null;
strb.delete(0, strb.length());
String[] split = string.split("E");
String num1 = split[0];
int num2 = Integer.parseInt(split[1]);
if (num2 > 0) //小数点右移
if (num1.contains(".")) //有小数点
String[] split2 = num1.split("\\\\.");//截断
if (split2[1].length() > num2) //防止空指针
sub1 = split2[1].substring(0, num2);
sub2 = split2[1].substring(num2, split2[1].length());
strb.append(split2[0]).append(sub1).append(".").append(sub2);
else //补0
strb.append(split2[0]).append(split2[1]).append(".");
for (int i = 0; i < num2 - split2.length; i++)
strb.append("0");
else //无小数点
strb.append(split[0]);
for (int i = 0; i < num2; i++)
strb.append("0");
else if (0 == num2) //不移动
strb.append(split[0]);
else //小数点左移
num2 = -num2;
strb.append("0");
if (split[0].contains(".")) //有小数点
String[] split2 = split[0].split("\\\\.");//截断
for (int i = 0; i < num2 - 1; i++)
strb.append("0");//占位
strb.append(split[0]);
result = strb.toString();
if (flg)
result = "-" + result;
/**
* 不这么做肯定是有原因的
*/
//result = new BigDecimal(num.toString()).toString();
return result;
public static void main(String[] args) throws Exception
SparkSession sc = SparkSession.builder()
.appName("Num2Str")
.master("local[*]")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> df1 = sc.sql("select col1 from db1.tb1");//获取到DataFrame
Num2Str zhiyongUdf1 = new Num2Str();
sc.udf().register("zhiyongUdf1", zhiyongUdf1, DataTypes.StringType);
df1.createTempView("temp1");
Dataset<Row> df2;
/**
* 写Hive表时无所谓,写文件时,有可能123456789.123456789→1.23456789123456789E8
* 导致从文件看到的数据与Hive表不一致
* cast(col1 as string) as col1_1 这种做法会遇到科学计数法的问题
* 已知bigint、double、float、decimal等大数→string时会概率变化为科学计数法
*/
if (true)
df2 = sc.sql("select zhiyongUdf1(col1) from temp1");
df2.show();
df2.write().text("hdfs://zhiyong1/temp1/day20221031");
JavaRDD<Row> rowJavaRDD = df2.toJavaRDD();
rowJavaRDD.saveAsTextFile("hdfs://zhiyong1/temp1/day20221031");
生产级代码脱敏后的内容大致如上。这样就可以获取到整形后的DataFrame,之后再存为文件就不会看到科学计数法了。
平台组件二开是个高危岗位,各种功能如果胆敢搞不出来就会被开除!!!能理解吧?
尾言
Spark官方是不建议写UDF的,因为UDF底层跑的是普通的JVM多线程任务,不会获得Spark的CataLyst优化,也不能享受到Spark自动托管内存的优势,性能表现较Spark自带的函数还是要差劲不少。实在是万不得已,才被迫出此下策。如果可以设置参数,其实性能表现要出色很多。
至于为何不用BigDecimal来转换,不便透露。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/127624005
以上是关于编写Spark的UDF函数解决Hive表大数bigintdoublefloatdecimal等转字符串string时出现的科学计数法问题Java的主要内容,如果未能解决你的问题,请参考以下文章