Spark:将 bytearray 转换为 bigint

Posted

技术标签:

【中文标题】Spark:将 bytearray 转换为 bigint【英文标题】:Spark: cast bytearray to bigint 【发布时间】:2019-12-13 21:36:51 【问题描述】:

尝试使用 pyspark 和 spark sql 将 kafka 密钥(二进制/字节数组)转换为 long/bigint 导致数据类型不匹配:无法将二进制转换为 bigint

环境详情:

Python 3.6.8 |Anaconda custom (64-bit)| (default, Dec 30 2018, 01:22:34)
[GCC 7.3.0] on linux
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera2
      /_/

Using Python version 3.6.8 (default, Dec 30 2018 01:22:34)
SparkSession available as 'spark'.

测试用例:

from pyspark.sql.types import StructType, StructField, BinaryType
df1_schema = StructType([StructField("key", BinaryType())])
df1_value = [[bytearray([0, 6, 199, 95, 77, 184, 55, 169])]]
df1 = spark.createDataFrame(df1_value,schema=df1_schema)
df1.printSchema()
#root
# |-- key: binary (nullable = true)

df1.show(truncate=False)
#+-------------------------+
#|key                      |
#+-------------------------+
#|[00 06 C7 5F 4D B8 37 A9]|
#+-------------------------+

df1.selectExpr('cast(key as bigint)').show(truncate=False)

错误:

(...)  File "/app/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o63.selectExpr.
: org.apache.spark.sql.AnalysisException: cannot resolve 'CAST(`key` AS BIGINT)' due to data type mismatch: cannot cast binary to bigint; line 1 pos 0;
(...)
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`key` AS BIGINT)' due to data type mismatch: cannot cast binary to bigint; line 1 pos 0;\n'Project [unresolvedalias(cast(key#0 as bigint), None)]\n+- AnalysisBarrier\n      +- LogicalRDD [key#0], false\n"

但我的预期结果是1908062000002985,例如:

dfpd = df1.toPandas()
int.from_bytes(dfpd['key'].values[0], byteorder='big')
#1908062000002985

【问题讨论】:

【参考方案1】:

使用pyspark.sql.functions.hexpyspark.sql.functions.conv

from pyspark.sql.functions import col, conv, hex

df1.withColumn("num", conv(hex(col("key")), 16, 10).cast("bigint")).show(truncate=False)
#+-------------------------+----------------+
#|key                      |num             |
#+-------------------------+----------------+
#|[00 06 C7 5F 4D B8 37 A9]|1908062000002985|
#+-------------------------+----------------+

仅当您希望结果为long 时才需要cast("bigint"),因为conv 返回StringType()

【讨论】:

以上是关于Spark:将 bytearray 转换为 bigint的主要内容,如果未能解决你的问题,请参考以下文章

将 shortarray 转换为 bytearray 和 bytearray 到 shortarray

如何将 Int 转换为 ByteArray,然后使用 Kotlin 将其转换回 Int?

Camerax 图像分析:将图像转换为 bytearray 或 ByteBuffer

Kotlin Native 如何将 ByteArray 转换为 String?

如何将 bytearray 转换为 img 标签?

如何将 Kotlin ByteArray 转换为 NsData,反之亦然