pyspark:Spark 2.3 中的arrays_zip 等效项

Posted

技术标签:

【中文标题】pyspark:Spark 2.3 中的arrays_zip 等效项【英文标题】:pyspark: arrays_zip equivalent in Spark 2.3 【发布时间】:2020-04-29 14:14:25 【问题描述】:

Spark 2.3中arrays_zip的等价函数怎么写?

来自 Spark 2.4 的源代码

def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

如何在 PySpark 中实现相似?

【问题讨论】:

你大概可以测试一下:f=lambda x,y:list(zip(x,y))myudf = F.udf(f,ArrayType(StructType([StructField('vals1',IntegerType(),False),StructField('vals2',IntegerType(),False)]))) 后跟 df.select(myudf(F.col('vals1'),F.col('vals2'))).collect() 不确定因此不作为答案发布,如果您尚未将导入命名为 F,请删除 F 前缀 【参考方案1】:

您可以通过创建用户定义函数来实现这一点

import pyspark.sql.functions as f
import pyspark.sql.types as t

arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),  
      t.ArrayType(t.StructType([
          # Choose Datatype according to requirement
          t.StructField("first", t.IntegerType()),
          t.StructField("second", t.StringType())
  ])))

df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])

现在结果是 spark

df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

Spark 2.4 版

的结果
df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

【讨论】:

以上仅适用于 2 个数组,而 arrays_zip 适用于任意数量的数组。 这为您提供了假设正在合并的数据类型的可行性。我们可以随时动态创建此代码。 这可以用一组动态列而不是像上面那样固定吗? 将当前函数与数组一起使用会出现错误:TypeError: <lambda>() missing 1 required positional argument: 'y'【参考方案2】:

您可以使用UDF 获得与arrays_zip 相同的功能。请注意,列类型必须相同才能使其工作(在本例中为IntegerType)。如果列类型有任何差异,请将列转换为通用类型,然后再使用UDF

from pyspark.sql import functions as F
from pyspark.sql import types as T

def zip_func(*args):
    return list(zip(*args))

zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))

可以和arrays_zip一样使用,例如:

df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()

【讨论】:

这是为您运行的吗?我看到一个奇怪的错误:net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for builtins.iter) @bp2010:我目前无法试用代码(需要在我的时区等到今晚),但错误与返回类型与 udf 声明不匹配有关。我更改了答案中的代码,试试它是否适合你。 (如果不使用 return list([list(z) for z in zip(*args)]) 的 udf 肯定会起作用,但我认为没有必要这样做。) 现在运行。但是,我正在尝试使用此功能来炸开拉链。但现在有了这个功能,我看到了错误:org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(cols). @bp2010:你确定你使用的是explode吗?这看起来像是来自expand 的错误。 expand 适用于结构,而在这种情况下,zip 返回一个数组数组。这可以通过返回一个结构数组来解决(请参阅 andy 对问题的评论),但它不会是动态的列数。 是的。我正在使用explode。我在这里发布的逻辑:***.com/a/61087359/3213111 我使用 arrays_zip 来利用它是动态的,因为我需要这个。知道如何以动态方式为列执行此操作吗?

以上是关于pyspark:Spark 2.3 中的arrays_zip 等效项的主要内容,如果未能解决你的问题,请参考以下文章

如何删除 Spark 表列中的空格(Pyspark)

从Apache Spark 2.3看大数据流式计算的发展趋势

Spark(pyspark)中的决策树模型如何可视化?

如何从 PySpark 中的 spark.ml 中提取模型超参数?

PySpark Array<double> 不是 Array<double>

删除 Spark 数据框中的空格时出错 - PySpark