分解两个 PySpark 数组并保持相同位置的元素
Posted
技术标签:
【中文标题】分解两个 PySpark 数组并保持相同位置的元素【英文标题】:Explode two PySpark arrays and keep elements from same positions 【发布时间】:2019-12-08 12:19:38 【问题描述】:我有一个 PySpark 数据框(比如 df1
),其中包含以下列
1.> category
: 一些字符串
2.> array1
: 元素数组
3.> array2
: 元素数组
以下是df1
的示例
+--------+--------------+--------------+
|category| array1| array2|
+--------+--------------+--------------+
|A | [x1, x2, x3]| [y1, y2, y3]|
|B | [u1, u2]| [v1, v2]|
+--------+--------------+--------------+
对于每一行,array1
的长度等于array2
的长度。在每一列中,我希望不同的行对array1
(和array2
)有不同大小的数组。
我想形成单独的列(比如element1
和element2
),这样在每一行中,列element1
和element2
包含来自array1
和array2
相同位置的元素分别。
以下是我想要的输出数据帧(比如df2
)的示例:
+--------+--------------+--------------+----------+----------+
|category| array1| array2| element1| element2|
+--------+--------------+--------------+----------+----------+
|A | [x1, x2, x3]| [y1, y2, y3]| x1| y1|
|A | [x1, x2, x3]| [y1, y2, y3]| x2| y2|
|A | [x1, x2, x3]| [y1, y2, y3]| x3| y3|
|B | [u1, u2]| [v1, v2]| u1| v1|
|B | [u1, u2]| [v1, v2]| u2| v2|
+--------+--------------+--------------+----------+----------+
以下是我到目前为止所尝试的(但除了我想要的之外,它还为我提供了来自不同位置的 element1
和 element2
的值。)
df2 = df1.select( "*", F.explode("array1").alias("element1") ).select( "*", F.explode("array2").alias("element2") )
【问题讨论】:
【参考方案1】:初始化
import pyspark.sql.functions as F
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)
columns = ['category','array1','array2']
vals = [
('A', ['x1', 'x2', 'x3'], ['y1','y2','y3']),
('B', ['u1', 'u2',], ['v1','v2'])
]
df = sqlContext.createDataFrame(vals, columns)
Based onarrays_zip
[docs]in spark >= 2.4
df.withColumn('new', F.arrays_zip('array1','array2')).withColumn('ex',explode('new'))\
.select('category','array1','array2',
col("ex.array1").alias('element1'),
col("ex.array2").alias('element2')\
).drop('new','ex').show()
输出
+--------+------------+------------+--------+--------+
|category| array1| array2|element1|element2|
+--------+------------+------------+--------+--------+
| A|[x1, x2, x3]|[y1, y2, y3]| x1| y1|
| A|[x1, x2, x3]|[y1, y2, y3]| x2| y2|
| A|[x1, x2, x3]|[y1, y2, y3]| x3| y3|
| B| [u1, u2]| [v1, v2]| u1| v1|
| B| [u1, u2]| [v1, v2]| u2| v2|
+--------+------------+------------+--------+--------+
说明
看看arrays_zip
产生的东西基本上可以解释一切。我们将 cols 与它一起合并/压缩,然后 explode
它。然后在explode创建的新列中简单地引用相应的结构。
>>> df.withColumn('new', F.arrays_zip('array1','array2')).show(truncate=False)
+--------+------------+------------+------------------------------+
|category|array1 |array2 |new |
+--------+------------+------------+------------------------------+
|A |[x1, x2, x3]|[y1, y2, y3]|[[x1, y1], [x2, y2], [x3, y3]]|
|B |[u1, u2] |[v1, v2] |[[u1, v1], [u2, v2]] |
+--------+------------+------------+------------------------------+
【讨论】:
【参考方案2】:对于 Spark >=2.4,您可以使用 Higher-Order Functions:
data = [('A', ['x1', 'x2', 'x3'], ['y1', 'y2', 'y3']),
('B', ['u1', 'u2'], ['v1', 'v2'])
]
df = spark.createDataFrame(data, ["category", "array1", "array2"])
# tranform array1, array2 => [struct(element1, element2)]
transform_expr = "transform(array1, (x, i) -> struct(x as element1, array2[i] as element2))"
# explode transformed arrays and extract values of element1 and element2
df.withColumn("merged_arrays", explode(expr(transform_expr))) \
.withColumn("element1", col("merged_arrays.element1")) \
.withColumn("element2", col("merged_arrays.element2")) \
.drop("merged_arrays") \
.show(truncate=False)
输出:
+--------+------------+------------+--------+--------+
|category|array1 |array2 |element1|element2|
+--------+------------+------------+--------+--------+
|A |[x1, x2, x3]|[y1, y2, y3]|x1 |y1 |
|A |[x1, x2, x3]|[y1, y2, y3]|x2 |y2 |
|A |[x1, x2, x3]|[y1, y2, y3]|x3 |y3 |
|B |[u1, u2] |[v1, v2] |u1 |v1 |
|B |[u1, u2] |[v1, v2] |u2 |v2 |
+--------+------------+------------+--------+--------+
transform
函数说明:
该函数采用第一个数组array1
并应用一个lambda 函数(x, i) -> struct(string, string)
,其中x
是实际值,i
是它在数组中的索引。对于每个值,我们返回一个包含该值的结构体element1
,array2
中的对应值(使用索引i
)作为element2
。
剩下的只是分解转换的结果并访问我们创建的结构元素。
【讨论】:
以上是关于分解两个 PySpark 数组并保持相同位置的元素的主要内容,如果未能解决你的问题,请参考以下文章
python使用numpy中的equal函数比较两个numpy数组中每个位置的元素是否相同并计算相同元素的比例
PySpark 使用 collect_list 收集不同长度的数组
1.合并两个数组,并保持仍然有序。2.删除合并后数组中的重复元素
javascript 常见数组操作( 1数组整体元素修改 2 数组筛选 3jquery 元素转数组 4获取两个数组中相同部分或者不同部分 5数组去重并倒序排序 6数组排序 7