pyspark:Spark 2.3 中的arrays_zip 等效项
Posted
技术标签:
【中文标题】pyspark:Spark 2.3 中的arrays_zip 等效项【英文标题】:pyspark: arrays_zip equivalent in Spark 2.3 【发布时间】:2020-04-29 14:14:25 【问题描述】:Spark 2.3中arrays_zip
的等价函数怎么写?
来自 Spark 2.4 的源代码
def arrays_zip(*cols):
"""
Collection function: Returns a merged array of structs in which the N-th struct contains all
N-th values of input arrays.
:param cols: columns of arrays to be merged.
>>> from pyspark.sql.functions import arrays_zip
>>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
>>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
[Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))
如何在 PySpark 中实现相似?
【问题讨论】:
你大概可以测试一下:f=lambda x,y:list(zip(x,y))
; myudf = F.udf(f,ArrayType(StructType([StructField('vals1',IntegerType(),False),StructField('vals2',IntegerType(),False)])))
后跟 df.select(myudf(F.col('vals1'),F.col('vals2'))).collect()
不确定因此不作为答案发布,如果您尚未将导入命名为 F
,请删除 F
前缀
【参考方案1】:
您可以通过创建用户定义函数来实现这一点
import pyspark.sql.functions as f
import pyspark.sql.types as t
arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),
t.ArrayType(t.StructType([
# Choose Datatype according to requirement
t.StructField("first", t.IntegerType()),
t.StructField("second", t.StringType())
])))
df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])
现在结果是 spark
df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)
+------------------------+
|zipped |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+
Spark 2.4 版
的结果df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)
+------------------------+
|zipped |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+
【讨论】:
以上仅适用于 2 个数组,而arrays_zip
适用于任意数量的数组。
这为您提供了假设正在合并的数据类型的可行性。我们可以随时动态创建此代码。
这可以用一组动态列而不是像上面那样固定吗?
将当前函数与数组一起使用会出现错误:TypeError: <lambda>() missing 1 required positional argument: 'y'
【参考方案2】:
您可以使用UDF
获得与arrays_zip
相同的功能。请注意,列类型必须相同才能使其工作(在本例中为IntegerType
)。如果列类型有任何差异,请将列转换为通用类型,然后再使用UDF
。
from pyspark.sql import functions as F
from pyspark.sql import types as T
def zip_func(*args):
return list(zip(*args))
zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))
可以和arrays_zip
一样使用,例如:
df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()
【讨论】:
这是为您运行的吗?我看到一个奇怪的错误:net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for builtins.iter)
@bp2010:我目前无法试用代码(需要在我的时区等到今晚),但错误与返回类型与 udf 声明不匹配有关。我更改了答案中的代码,试试它是否适合你。 (如果不使用 return list([list(z) for z in zip(*args)])
的 udf 肯定会起作用,但我认为没有必要这样做。)
现在运行。但是,我正在尝试使用此功能来炸开拉链。但现在有了这个功能,我看到了错误:org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(cols).
@bp2010:你确定你使用的是explode
吗?这看起来像是来自expand
的错误。 expand
适用于结构,而在这种情况下,zip 返回一个数组数组。这可以通过返回一个结构数组来解决(请参阅 andy 对问题的评论),但它不会是动态的列数。
是的。我正在使用explode
。我在这里发布的逻辑:***.com/a/61087359/3213111 我使用 arrays_zip 来利用它是动态的,因为我需要这个。知道如何以动态方式为列执行此操作吗?以上是关于pyspark:Spark 2.3 中的arrays_zip 等效项的主要内容,如果未能解决你的问题,请参考以下文章
从Apache Spark 2.3看大数据流式计算的发展趋势
如何从 PySpark 中的 spark.ml 中提取模型超参数?