为啥 Spark 在创建 DataFrame 时会推断二进制而不是 Array[Byte]?

Posted

技术标签:

【中文标题】为啥 Spark 在创建 DataFrame 时会推断二进制而不是 Array[Byte]?【英文标题】:Why does Spark infer a binary instead of an Array[Byte] when creating a DataFrame?为什么 Spark 在创建 DataFrame 时会推断二进制而不是 Array[Byte]? 【发布时间】:2018-10-31 01:24:36 【问题描述】:

原则上,我有一个 DataFrame,它由 "Name""Values" 字段组成。第一个字段是String,第二个字段是Array[Byte]

我想对这个DataFrame 的每条记录做的是应用任何函数,使用UDF 并创建一个新列。当 "Values"Array[Int] 时,这非常有效。但是,当是Array[Byte]时,会出现如下错误:

org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(Values)' due to data type mismatch: argument 1 requires array<tinyint> type, however, '`Values`' is of binary type.;;
'Project [Name#47, Values#48, UDF(Values#48) AS TwoTimes#56]
+- Project [_1#44 AS Name#47, _2#45 AS Values#48]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#44, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#45]
  +- ExternalRDD [obj#43]

完整代码如下:

scala> val df1 = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df1: org.apache.spark.sql.DataFrame = [Name: string, Values: binary]

scala> df1.show
+----+----------------+
|Name|          Values|
+----+----------------+
| one|[01 02 03 04 05]|
| two|[06 07 08 09 0A]|
+----+----------------+

scala> val twice = udf  (values: Seq[Byte]) =>
   |     val result = Array.ofDim[Byte](values.length)
   |     for (i <- values.indices)
   |         result(i) = (2 * values(i).toInt).toByte
   |     result
   | 
twice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BinaryType,Some(List(ArrayType(ByteType,false))))

scala> val df2 = df1.withColumn("TwoTimes", twice('Values))

我了解由于错误的数据类型(预期:Array[Byte],但它找到了Binary)而出现这样的错误,但我不明白为什么 Spark 将我的 Array[Byte] 推断为Binary。谁能给我解释一下?

如果我必须使用Binary 而不是Array[Byte],我应该如何在我的UDF 中处理它?

我澄清一下,我原来的 UDF 没有使用微不足道的 for 循环。我知道在这个例子中,这可以用map 方法代替。

【问题讨论】:

【参考方案1】:

在 Spark 中,Array[Byte] 表示为 BinaryType。从documentation我们可以看出:

公共类 BinaryType 扩展 DataType 表示Array[Byte] 值的数据类型。请使用单例 DataTypes.BinaryType。

因此,Array[Byte]Binary 是相同的,但是它们与 Seq[Byte] 之间存在一些差异,这会导致错误。

要解决此问题,只需将 udf 中的 Seq[Byte] 替换为 Array[Byte]

val twice = udf  (values: Array[Byte]) =>
  val result = Array.ofDim[Byte](values.length)
  for (i <- values.indices)
    result(i) = (2 * values(i).toInt).toByte
  result

【讨论】:

以上是关于为啥 Spark 在创建 DataFrame 时会推断二进制而不是 Array[Byte]?的主要内容,如果未能解决你的问题,请参考以下文章

为啥我在尝试访问 spark master webUI 时会重置连接?

为啥 Spark DataFrame 会创建错误数量的分区?

为啥使用 DataFrame 时 Spark 会报告“java.net.URISyntaxException:绝对 URI 中的相对路径”?

为啥即使使用限制命令访问结果,SPARK \PYSPARK 也会计算所有内容?

如何在 Spark 中并行创建 RDD / DataFrame?

Spark:创建 DataFrame 会出现异常