分解两个 PySpark 数组并保持相同位置的元素

Posted

技术标签:

【中文标题】分解两个 PySpark 数组并保持相同位置的元素【英文标题】:Explode two PySpark arrays and keep elements from same positions 【发布时间】:2019-12-08 12:19:38 【问题描述】:

我有一个 PySpark 数据框(比如 df1),其中包含以下列

1.> category : 一些字符串 2.> array1 : 元素数组 3.> array2 : 元素数组

以下是df1的示例

+--------+--------------+--------------+
|category|        array1|        array2|
+--------+--------------+--------------+
|A       |  [x1, x2, x3]|  [y1, y2, y3]|
|B       |      [u1, u2]|      [v1, v2]|
+--------+--------------+--------------+

对于每一行,array1 的长度等于array2 的长度。在每一列中,我希望不同的行对array1(和array2)有不同大小的数组。

我想形成单独的列(比如element1element2),这样在每一行中,列element1element2 包含来自array1array2 相同位置的元素分别。

以下是我想要的输出数据帧(比如df2)的示例:

+--------+--------------+--------------+----------+----------+
|category|        array1|        array2|  element1|  element2|
+--------+--------------+--------------+----------+----------+
|A       |  [x1, x2, x3]|  [y1, y2, y3]|        x1|        y1|
|A       |  [x1, x2, x3]|  [y1, y2, y3]|        x2|        y2|
|A       |  [x1, x2, x3]|  [y1, y2, y3]|        x3|        y3|
|B       |      [u1, u2]|      [v1, v2]|        u1|        v1|
|B       |      [u1, u2]|      [v1, v2]|        u2|        v2|
+--------+--------------+--------------+----------+----------+

以下是我到目前为止所尝试的(但除了我想要的之外,它还为我提供了来自不同位置的 element1element2 的值。)

df2 = df1.select( "*", F.explode("array1").alias("element1") ).select( "*", F.explode("array2").alias("element2") ) 

【问题讨论】:

【参考方案1】:

初始化

import pyspark.sql.functions as F

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

columns = ['category','array1','array2']
vals = [
     ('A', ['x1', 'x2', 'x3'], ['y1','y2','y3']),
     ('B', ['u1', 'u2',], ['v1','v2'])
]
df = sqlContext.createDataFrame(vals, columns)

Based onarrays_zip [docs]in spark >= 2.4

df.withColumn('new', F.arrays_zip('array1','array2')).withColumn('ex',explode('new'))\
    .select('category','array1','array2',
            col("ex.array1").alias('element1'),
            col("ex.array2").alias('element2')\
           ).drop('new','ex').show()

输出

+--------+------------+------------+--------+--------+
|category|      array1|      array2|element1|element2|
+--------+------------+------------+--------+--------+
|       A|[x1, x2, x3]|[y1, y2, y3]|      x1|      y1|
|       A|[x1, x2, x3]|[y1, y2, y3]|      x2|      y2|
|       A|[x1, x2, x3]|[y1, y2, y3]|      x3|      y3|
|       B|    [u1, u2]|    [v1, v2]|      u1|      v1|
|       B|    [u1, u2]|    [v1, v2]|      u2|      v2|
+--------+------------+------------+--------+--------+

说明 看看arrays_zip 产生的东西基本上可以解释一切。我们将 cols 与它一起合并/压缩,然后 explode 它。然后在explode创建的新列中简单地引用相应的结构。

>>> df.withColumn('new', F.arrays_zip('array1','array2')).show(truncate=False)
+--------+------------+------------+------------------------------+
|category|array1      |array2      |new                           |
+--------+------------+------------+------------------------------+
|A       |[x1, x2, x3]|[y1, y2, y3]|[[x1, y1], [x2, y2], [x3, y3]]|
|B       |[u1, u2]    |[v1, v2]    |[[u1, v1], [u2, v2]]          |
+--------+------------+------------+------------------------------+

【讨论】:

【参考方案2】:

对于 Spark >=2.4,您可以使用 Higher-Order Functions:

data = [('A', ['x1', 'x2', 'x3'], ['y1', 'y2', 'y3']),
        ('B', ['u1', 'u2'], ['v1', 'v2'])
      ]
df = spark.createDataFrame(data, ["category", "array1", "array2"])

# tranform array1, array2 => [struct(element1, element2)]
transform_expr = "transform(array1, (x, i) -> struct(x as element1, array2[i] as element2))"

# explode transformed arrays and extract values of element1 and element2
df.withColumn("merged_arrays", explode(expr(transform_expr))) \
    .withColumn("element1", col("merged_arrays.element1")) \
    .withColumn("element2", col("merged_arrays.element2")) \
    .drop("merged_arrays") \
    .show(truncate=False)

输出:

+--------+------------+------------+--------+--------+
|category|array1      |array2      |element1|element2|
+--------+------------+------------+--------+--------+
|A       |[x1, x2, x3]|[y1, y2, y3]|x1      |y1      |
|A       |[x1, x2, x3]|[y1, y2, y3]|x2      |y2      |
|A       |[x1, x2, x3]|[y1, y2, y3]|x3      |y3      |
|B       |[u1, u2]    |[v1, v2]    |u1      |v1      |
|B       |[u1, u2]    |[v1, v2]    |u2      |v2      |
+--------+------------+------------+--------+--------+

transform函数说明:

该函数采用第一个数组array1 并应用一个lambda 函数(x, i) -> struct(string, string),其中x 是实际值,i 是它在数组中的索引。对于每个值,我们返回一个包含该值的结构体element1array2 中的对应值(使用索引i)作为element2

剩下的只是分解转换的结果并访问我们创建的结构元素。

【讨论】:

以上是关于分解两个 PySpark 数组并保持相同位置的元素的主要内容,如果未能解决你的问题,请参考以下文章

python使用numpy中的equal函数比较两个numpy数组中每个位置的元素是否相同并计算相同元素的比例

PySpark 使用 collect_list 收集不同长度的数组

使用 PySpark 分解数组值

用分解的方式学算法006——堆排序

1.合并两个数组,并保持仍然有序。2.删除合并后数组中的重复元素

javascript 常见数组操作( 1数组整体元素修改 2 数组筛选 3jquery 元素转数组 4获取两个数组中相同部分或者不同部分 5数组去重并倒序排序 6数组排序 7