spark中混合数据的ArrayType

Posted

技术标签:

【中文标题】spark中混合数据的ArrayType【英文标题】:ArrayType of mixed data in spark 【发布时间】:2018-06-20 12:02:18 【问题描述】:

我想将两个不同的数组列表合并为一个。每个数组都是 spark 数据框中的一列。因此,我想使用 udf

def some_function(u,v):
  li = list()
  for x,y in zip(u,v):
      li.append(x.extend(y))
  return li

udf_object = udf(some_function,ArrayType(ArrayType(StringType()))))
new_x = x.withColumn('new_name',udf_object(col('name'),col('features')))

这是数据的架构:

root
 |-- blockingkey: string (nullable = true)
 |-- blocked_records: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- flattened_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: float (containsNull = true)
 |-- name: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

我正在尝试合并名称和功能。所以就像名称中的第一个元素将与特征中的第一个元素合并。 但这仅在存在 Integer 或 FloatValues 时返回具有 NUll 值的数组。如果可以使用 udf 或其他方式完成,请帮助我解决此问题。

【问题讨论】:

你不能将浮点数和字符串合并到一个数组中,两者都应该是相同的类型 如果我只是在 li 中附加 x,那么它只会正确返回名称。但我希望列表由 y 扩展。 【参考方案1】:

如果你有dataframeschema

+------------------------------------------------+----------------------------------------+
|features                                        |name                                    |
+------------------------------------------------+----------------------------------------+
|[WrappedArray(2.0, 3.0), WrappedArray(3.0, 5.0)]|[WrappedArray(a, b), WrappedArray(c, d)]|
|[WrappedArray(2.0, 3.0), WrappedArray(3.0, 5.0)]|[WrappedArray(a, b), WrappedArray(c, d)]|
+------------------------------------------------+----------------------------------------+

root
 |-- features: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)
 |-- name: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

然后可以定义udf函数,调用udf函数为

import pyspark.sql.types as t
from pyspark.sql import functions as f

def some_function(u,v):
    li = []
    for x, y in zip(u, v):
        li.append(x + y)
    return li

udf_object = f.udf(some_function,t.ArrayType(t.ArrayType(t.StringType())))

new_x = x.withColumn('new_name',udf_object(f.col('name'),f.col('features')))

所以new_x 会是

+------------------------------------------------+----------------------------------------+------------------------------------------------------------+
|features                                        |name                                    |new_name                                                    |
+------------------------------------------------+----------------------------------------+------------------------------------------------------------+
|[WrappedArray(2.0, 3.0), WrappedArray(3.0, 5.0)]|[WrappedArray(a, b), WrappedArray(c, d)]|[WrappedArray(a, b, 2.0, 3.0), WrappedArray(c, d, 3.0, 5.0)]|
|[WrappedArray(2.0, 3.0), WrappedArray(3.0, 5.0)]|[WrappedArray(a, b), WrappedArray(c, d)]|[WrappedArray(a, b, 2.0, 3.0), WrappedArray(c, d, 3.0, 5.0)]|
+------------------------------------------------+----------------------------------------+------------------------------------------------------------+

root
 |-- features: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)
 |-- name: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- new_name: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)

希望回答对你有帮助

【讨论】:

以上是关于spark中混合数据的ArrayType的主要内容,如果未能解决你的问题,请参考以下文章

混合 Spark Structured Streaming API 和 DStream 写入 Kafka

高斯混合模型:Spark MLlib 和 scikit-learn 之间的区别

Spark Kudu 结合

spark的java和scala混合工程构建记录!

地铁译:Spark for python developers ---Spark与数据的机器学习

Spark高效的groupby操作-重新分区?