pyspark中的未嵌套列表
Posted
技术标签:
【中文标题】pyspark中的未嵌套列表【英文标题】:unnest list in pyspark 【发布时间】:2018-06-11 01:56:35 【问题描述】:我正在尝试使用combineByKey
来查找我的作业的每个键的中值(使用combineByKey
是作业的要求),我计划使用以下函数返回(k, v)
对,其中@ 987654324@ 与同一键关联的所有值的列表。之后,我打算对值进行排序,然后找到中位数。
data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])
rdd = data.combineByKey(lambda value: value, lambda c, v: median1(c,v), lambda c1, c2: median2(c1,c2))
def median1 (c,v):
list = [c]
list.append(v)
return list
def median2 (c1,c2):
list2 = [c1]
list2.append(c2)
return list2
但是,我的代码给出如下输出:
[('A', [[2, [4, 9]], 3]), ('B', [10, 20])]
其中 value 是一个嵌套列表。无论如何我可以取消 pyspark 中的值以获得
[('A', [2, 4, 9, 3]), ('B', [10, 20])]
或者还有其他方法可以使用combineByKey
找到每个键的中位数吗?谢谢!
【问题讨论】:
【参考方案1】:在数据框列上使用collect_list
更容易。
from pyspark.sql.functions import collect_list
df = rdd.toDF(['key', 'values'])
key_lists = df.groupBy('key').agg(collect_list('values').alias('value_list'))
【讨论】:
【参考方案2】:你只是没有从价值中得到一个好的组合器。
这是你的答案:
data = sc.parallelize([('A',2), ('A',4), ('A',9), ('A',3), ('B',10), ('B',20)])
def createCombiner(value):
return [value]
def mergeValue(c, value):
return c.append(value)
def mergeCombiners(c1, c2):
return c1+c2
rdd = data.combineByKey(createCombiner, mergeValue, mergeCombiners)
[('A', [9, 4, 2, 3]), ('B', [10, 20])]
【讨论】:
谢谢!我试过了,但我得到了 [('A', [9, 4, 2, 3]), ('B', None)] ,我想这与 spark 随机分区数据的方式有关。我尝试将功能更新到以下内容,但仍然无法解决问题。你有什么解决办法吗?谢谢你。 def createCombiner(value): return [value] def mergeValue(c, value): if c == []: result = value elif value == []: result = c else: result = c.append(value) 返回结果def mergeCombiners(c1, c2): if c1 == []: result = c2 elif c2 == []: result = c1 else: result = c1+c2 返回结果 我不知道,它对我有用。关于您尝试进行的更新:每个函数都应返回一个数组,其中 c 是一个数组,v 是一个整数。所以至少它会是“if c==[]: return [value]”以上是关于pyspark中的未嵌套列表的主要内容,如果未能解决你的问题,请参考以下文章