Collect failed in ... s 由于 Stage 由于 SparkContext 已关闭而取消

Posted

技术标签:

【中文标题】Collect failed in ... s 由于 Stage 由于 SparkContext 已关闭而取消【英文标题】:Collect failed in ... s due to Stage cancelled because SparkContext was shut down 【发布时间】:2019-02-19 05:42:55 【问题描述】:

我想显示每个分区的元素个数,所以我写如下:

def count_in_a_partition(iterator):
    yield sum(1 for _ in iterator)

如果我这样使用它

print("number of element in each partitions: ".format(
  my_rdd.mapPartitions(count_in_a_partition).collect()
))

我得到以下信息:

19/02/18 21:41:15 INFO DAGScheduler: Job 3 failed: collect at /project/6008168/tamouze/testSparkCedar.py:435, took 30.859710 s
19/02/18 21:41:15 INFO DAGScheduler: ResultStage 3 (collect at /project/6008168/tamouze/testSparkCedar.py:435) failed in 30.848 s due to Stage cancelled because SparkContext was shut down
19/02/18 21:41:15 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/02/18 21:41:16 INFO MemoryStore: MemoryStore cleared
19/02/18 21:41:16 INFO BlockManager: BlockManager stopped
19/02/18 21:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/02/18 21:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_14 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_14 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 WARN BlockManager: Putting block rdd_3_3 failed due to exception java.net.SocketException: Connection reset.
19/02/18 21:41:16 WARN BlockManager: Block rdd_3_3 could not be removed as it was not found on disk or in memory
19/02/18 21:41:16 INFO SparkContext: Successfully stopped SparkContext
....

注意到my_rdd.take(1) 返回:

[(u'id', u'text', array([-0.31921682, ...,0.890875]))]

我该如何解决这个问题?

【问题讨论】:

这在当前状态下是不可挽救的。你能添加一个MVCE吗?这将帮助您编写它***.com/questions/48427185/… 【参考方案1】:

您必须为此使用glom() 函数。举个例子吧。

让我们先创建一个DataFrame。

rdd=sc.parallelize([('a',22),('b',1),('c',4),('b',1),('d',2),('e',0),('d',3),('a',1),('c',4),('b',7),('a',2),('f',1)] )
df=rdd.toDF(['key','value'])
df=df.repartition(5,"key") # Make 5 Partitions

分区数-

print("Number of partitions: ".format(df.rdd.getNumPartitions())) 
    Number of partitions: 5

每个分区上的行数/元素数。这可以让您了解歪斜 -

print('Partitioning distribution: '+ str(df.rdd.glom().map(len).collect()))
    Partitioning distribution: [3, 3, 2, 2, 2]

查看行在分区上的实际分布情况。请注意,如果数据集很大,那么您的系统可能会因为内存不足OOM 问题而崩溃。

print("Partitions structure: ".format(df.rdd.glom().collect()))
    Partitions structure: [
       #Partition 1        [Row(key='a', value=22), Row(key='a', value=1), Row(key='a', value=2)], 
       #Partition 2        [Row(key='b', value=1), Row(key='b', value=1), Row(key='b', value=7)], 
       #Partition 3        [Row(key='c', value=4), Row(key='c', value=4)], 
       #Partition 4        [Row(key='e', value=0), Row(key='f', value=1)], 
       #Partition 5        [Row(key='d', value=2), Row(key='d', value=3)]
                          ]

【讨论】:

OP 使用的解决方案在性能方面是正确且最优的。所以 you have to use glom 是完全不正确的,建议的解决方案比原来的解决方案更糟糕(因为你已经注意到它必须加载整个分区),并且没有解决实际来源失败(公平地说,无法通过帖子中提供的信息来确定)。

以上是关于Collect failed in ... s 由于 Stage 由于 SparkContext 已关闭而取消的主要内容,如果未能解决你的问题,请参考以下文章

Failed to collect certificates from /data/app/vmdl201020547.tmp/base.apk: META-INF/CERT.SF indicates

ipa-server-install command failed, exception: RuntimeError: CA did not start in 300.0s

IssueQuery failed in redmine rake tasks

Google Play 游戏/服务登录随机停止使用 resultCode GamesActivityResultCodes.RESULT_SIGN_IN_FAILED / 10002

failed to find plugin “flannel” in path [/opt/cni/bin],k8sNotReady解决方案

1443. Minimum Time to Collect All Apples in a Tree