创建一个 Spark udf 函数以迭代字节数组并将其转换为数字

Posted

技术标签:

【中文标题】创建一个 Spark udf 函数以迭代字节数组并将其转换为数字【英文标题】:Create an Spark udf function to iterate over an Array of bytes and convert it to numeric 【发布时间】:2018-11-27 16:08:35 【问题描述】:

我在 spark (python) 中有一个带有字节数组的 Dataframe

DF.select(DF.myfield).show(1, False)
+----------------+                                                              
|myfield         |
+----------------+
|[00 8F 2B 9C 80]|
+----------------+

我正在尝试将此数组转换为字符串

'008F2B9C80'

然后到数值

int('008F2B9C80',16)/1000000
> 2402.0

我找到了一些 udf 样本,所以我已经可以像这样提取数组的一部分:

u = f.udf(lambda a: format(a[1],'x'))
DF.select(u(DF['myfield'])).show()
+------------------+                                                            
|<lambda>(myfield) |
+------------------+
|                8f|
+------------------+

现在如何遍历整个数组? 是否可以完成我必须在 udf 函数中编码的所有操作?

可能有一个最好的方法来做演员???

感谢您的帮助

【问题讨论】:

【参考方案1】:

我也找到了python解决方案

from pyspark.sql.functions import udf
spark.udf.register('ByteArrayToDouble', lambda x: int.from_bytes(x, byteorder='big', signed=False) / 10e5)
spark.sql('select myfield, ByteArrayToDouble(myfield) myfield_python, convert_binary(hex(myfield))/1000000 myfield_scala from my_table').show(1, False)
+-------------+-----------------+----------------+
|myfield      |myfield_python   |myfield_scala   |
+-------------+-----------------+----------------+
|[52 F4 92 80]|1391.76          |1391.76         |
+-------------+-----------------+----------------+
only showing top 1 row

我现在可以替代这两种解决方案

感谢您的宝贵帮助

【讨论】:

【参考方案2】:

我在回答您的最新问题时遇到了这个问题。

假设你有df

+--------------------+
|             myfield|
+--------------------+
|[00, 8F, 2B, 9C, 80]|
|    [52, F4, 92, 80]|
+--------------------+

现在你可以使用下面的 lambda 函数了

def func(val):
    return int("".join(val), 16)/1000000
func_udf = udf(lambda x: func(x), FloatType())

要创建输出,请使用

df = df.withColumn("myfield1", func_udf("myfield"))

这会产生,

+--------------------+--------+
|             myfield|myfield1|
+--------------------+--------+
|[00, 8F, 2B, 9C, 80]|  2402.0|
|    [52, F4, 92, 80]| 1391.76|
+--------------------+--------+

【讨论】:

【参考方案3】:

这是 scala df 解决方案。您需要导入 scala.math.BigInteger

scala> val df = Seq((Array("00","8F","2B","9C","80"))).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: array<string>]

scala> df.withColumn("idstr",concat_ws("",'id)).show
+--------------------+----------+
|                  id|     idstr|
+--------------------+----------+
|[00, 8F, 2B, 9C, 80]|008F2B9C80|
+--------------------+----------+


scala> import scala.math.BigInt
import scala.math.BigInt

scala> def convertBig(x:String):String = BigInt(x.sliding(2,2).map( x=> Integer.parseInt(x,16)).map(_.toByte).toArray).toString
convertBig: (x: String)String

scala> val udf_convertBig =  udf( convertBig(_:String):String )
udf_convertBig: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.withColumn("idstr",concat_ws("",'id)).withColumn("idBig",udf_convertBig('idstr)).show(false)
+--------------------+----------+----------+
|id                  |idstr     |idBig     |
+--------------------+----------+----------+
|[00, 8F, 2B, 9C, 80]|008F2B9C80|2402000000|
+--------------------+----------+----------+


scala>

scala 的 BigInteger 没有 spark 等效项,所以我将 udf() 结果转换为字符串。

【讨论】:

听起来很有趣,我现在尝试在我的 pyspark 项目中调用 scala udf 函数。 (medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9) 谢谢我已经创建了一个udf函数,并且编译成功:package com.mycompany.spark.udf import org.apache.spark.sql.api.java.UDF1 import scala.math.BigInt import scala.util.Try class ConvertBinaryDecimal extends UDF1[String, String] override def call(TableauBinary: String):String = BigInt(TableauBinary.sliding(2,2).map( TableauBinary=&gt; Integer.parseInt(TableauBinary,16)).map(_.toByte).toArray).toString我的最后一个问题是直接用dataframe二进制字段调用它。是否可以在函数内将二进制文件转换为字符串? 这就是我在我的 UDF 中所做的 - idstr 在我的答案中是字符串。 我错过了 convertBig(_:String):String ),对不起! 其实我想知道是否可以直接使用参数中的二进制字段而不是“withcolumn”字符串结果来调用udf。类似于df.withColumn("idBig",udf_convertBig('id)).show(false) 将二进制数组“id”发送到 ConvertBinaryDecimal 函数

以上是关于创建一个 Spark udf 函数以迭代字节数组并将其转换为数字的主要内容,如果未能解决你的问题,请参考以下文章

Spark迭代算法UDF在每次迭代中被多次触发

一套 Spark UDF 实践教程(文末专栏抽奖)

定义一个接受 Spark DataFrame 中对象数组的 UDF?

定义一个接受 Spark DataFrame 中对象数组的 UDF?

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]

Databricks - 创建永久用户定义函数 (UDF)