pyspark中的未嵌套列表

Posted

技术标签:

【中文标题】pyspark中的未嵌套列表【英文标题】:unnest list in pyspark 【发布时间】:2018-06-11 01:56:35 【问题描述】:

我正在尝试使用combineByKey 来查找我的作业的每个键的中值(使用combineByKey 是作业的要求),我计划使用以下函数返回(k, v) 对,其中@ 987654324@ 与同一键关联的所有值的列表。之后,我打算对值进行排序,然后找到中位数。

data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])

rdd = data.combineByKey(lambda value: value, lambda c, v: median1(c,v), lambda c1, c2: median2(c1,c2))

def median1 (c,v):
    list = [c]
    list.append(v)
    return list

def median2 (c1,c2):
    list2 = [c1]
    list2.append(c2)
    return list2

但是,我的代码给出如下输出:

[('A', [[2, [4, 9]], 3]), ('B', [10, 20])]

其中 value 是一个嵌套列表。无论如何我可以取消 pyspark 中的值以获得

[('A', [2, 4, 9, 3]), ('B', [10, 20])]

或者还有其他方法可以使用combineByKey 找到每个键的中位数吗?谢谢!

【问题讨论】:

【参考方案1】:

在数据框列上使用collect_list 更容易。

from pyspark.sql.functions import collect_list

df = rdd.toDF(['key', 'values'])

key_lists = df.groupBy('key').agg(collect_list('values').alias('value_list'))

【讨论】:

【参考方案2】:

你只是没有从价值中得到一个好的组合器。

这是你的答案:

data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])

def createCombiner(value):
    return [value]
def mergeValue(c, value):
    return c.append(value)
def mergeCombiners(c1, c2):
    return c1+c2

rdd = data.combineByKey(createCombiner, mergeValue, mergeCombiners)

[('A', [9, 4, 2, 3]), ('B', [10, 20])]

【讨论】:

谢谢!我试过了,但我得到了 [('A', [9, 4, 2, 3]), ('B', None)] ,我想这与 spark 随机分区数据的方式有关。我尝试将功能更新到以下内容,但仍然无法解决问题。你有什么解决办法吗?谢谢你。 def createCombiner(value): return [value] def mergeValue(c, value): if c == []: result = value elif value == []: result = c else: result = c.append(value) 返回结果def mergeCombiners(c1, c2): if c1 == []: result = c2 elif c2 == []: result = c1 else: result = c1+c2 返回结果 我不知道,它对我有用。关于您尝试进行的更新:每个函数都应返回一个数组,其中 c 是一个数组,v 是一个整数。所以至少它会是“if c==[]: return [value]”

以上是关于pyspark中的未嵌套列表的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 将列表列转换为嵌套结构列

数据框列中的嵌套列表,提取数据框列中列表的值 Pyspark Spark

重命名 Pyspark Dataframe 中的未命名列

pyspark reduce键是一个元组值嵌套列表

PySpark 嵌套数据框

Pyspark DataFrames 中的嵌套 SELECT 查询