Dataproc 上的 PySpark 因 SocketTimeoutException 而停止

Posted

技术标签:

【中文标题】Dataproc 上的 PySpark 因 SocketTimeoutException 而停止【英文标题】:PySpark on Dataproc stops with SocketTimeoutException 【发布时间】:2018-01-01 01:46:28 【问题描述】:

我们目前正在尝试使用 PySpark 2.2.0 在 Dataproc 集群上运行 Spark 作业,但 Spark 作业会在看似随机的时间过去后停止并显示以下错误消息:

17/07/25 00:52:48 ERROR org.apache.spark.api.python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:702)

错误有时可能只需要几分钟或 3 小时就会发生。根据个人经验,Spark 作业运行大约 30 分钟到 1 小时后才会出现错误。

一旦 Spark 作业遇到错误,它就会停止。无论我等待多长时间,它都没有输出。在 YARN ResourceManager 上,应用程序状态仍然标记为“正在运行”,我必须 Ctrl+C 才能终止程序。此时,应用程序被标记为“已完成”。

我在主节点的控制台上使用/path/to/spark/bin/spark-submit --jars /path/to/jar/spark-streaming-kafka-0-8-assembly_2.11-2.2.0.jar spark_job.py 命令运行 Spark 作业。 JAR 文件是必需的,因为 Spark 作业从 Kafka 流式传输消息(与 Spark 作业在同一集群上运行)并将一些消息推送回同一个 Kafka 到不同的主题。

我已经查看了此站点上的其他一些答案(主要是 this 和 this),它们有所帮助,但我们无法追踪日志中的哪个位置可能会说明导致的原因执行人去死。到目前为止,我已经通过 YARN ResourceManager 监控了任务期间的节点,并查看了每个节点中位于 /var/logs/hadoop-yarn 目录中的日志。我可以在日志中找到的唯一“线索”是org.apache.spark.executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM,这是写入已死执行程序日志的唯一行。

作为最后的努力,我们尝试增加集群的内存大小,希望问题会消失,但事实并非如此。最初,该集群运行在 1 主 2 工作集群上,具有 4vCPU、15GB 内存。我们创建了一个新的 Dataproc 集群,这次有 1 个 master 和 3 个 worker,每个 worker 有 8vCPU 52GB 内存(master 与之前的规格相同)。

我们想知道的是: 1. 在哪里/如何查看导致执行者被终止的异常? 2. 这是 Spark 配置的问题吗? 3. Dataproc 映像版本为“预览版”。这可能是错误的原因吗? 最终, 4. 我们如何解决这个问题?我们还可以采取哪些其他步骤?

此 Spark 作业需要无限期地从 Kafka 连续流式传输,因此我们希望修复此错误,而不是延长错误发生所需的时间。

以下是来自 YARN ResourceManager 的一些屏幕截图,用于展示我们所看到的内容:

集群指标

执行者总结

屏幕截图来自 Spark 作业因错误停止之前。

这是位于/path/to/spark/conf/spark-defaults.conf 的 Spark 配置文件(未更改 Dataproc 的默认设置):

spark.master yarn
spark.submit.deployMode client
spark.yarn.jars=local:/usr/lib/spark/jars/*
spark.eventLog.enabled true
spark.eventLog.dir hdfs://highmem-m/user/spark/eventlog

# Dynamic allocation on YARN
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.executor.instances 10000
spark.dynamicAllocation.maxExecutors 10000
spark.shuffle.service.enabled true
spark.scheduler.minRegisteredResourcesRatio 0.0

spark.yarn.historyServer.address highmem-m:18080
spark.history.fs.logDirectory hdfs://highmem-m/user/spark/eventlog

spark.executor.cores 2
spark.executor.memory 4655m
spark.yarn.executor.memoryOverhead 465

# Overkill
spark.yarn.am.memory 4655m
spark.yarn.am.memoryOverhead 465

spark.driver.memory 3768m
spark.driver.maxResultSize 1884m
spark.rpc.message.maxSize 512

# Add ALPN for Bigtable
spark.driver.extraJavaOptions 
spark.executor.extraJavaOptions 

# Disable Parquet metadata caching as its URI re-encoding logic does
# not work for GCS URIs (b/28306549). The net effect of this is that
# Parquet metadata will be read both driver side and executor side.
spark.sql.parquet.cacheMetadata=false

# User-supplied properties.
#Mon Jul 24 23:12:12 UTC 2017
spark.executor.cores=4
spark.executor.memory=18619m
spark.driver.memory=3840m
spark.driver.maxResultSize=1920m
spark.yarn.am.memory=640m
spark.executorEnv.PYTHONHASHSEED=0

我不太确定 User-supplied properties 来自哪里。

编辑: 有关集群的一些附加信息: 我使用在 https://github.com/GoogleCloudPlatform/dataproc-initialization-actions 找到的 zookeeper、kafka 和 jupyter 初始化操作脚本,顺序为 zookeeper -> kafka -> jupyter(不幸的是,我目前没有足够的声誉来发布超过 2 个链接)

编辑 2: 从@Dennis 的富有洞察力的问题中,我们运行了 Spark 作业,同时特别注意使用了更高堆存储内存的执行程序。我注意到的是,与其他执行程序相比,来自 worker #0 的执行程序具有显着更高的存储内存使用率。 worker #0 的执行者的标准输出文件总是空的。这三行在 stderr 中重复了很多次:

17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:01 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:04 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:07 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:09 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:10 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:13 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:14 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:15 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to 
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Verifying properties
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Property group.id is overridden to 
17/07/27 16:32:18 INFO kafka.utils.VerifiableProperties: Property zookeeper.connect is overridden to

它似乎每 1~3 秒重复一次。

至于其他worker节点的其他executor的stdout和stderr是空的。

编辑 3: 正如@Dennis 的 cmets 所述,我们保留了 Spark 作业正在使用的 Kafka 主题,复制因子为 1。我还发现我忘记在 Kafka 配置文件中将 worker #2 添加到 zookeeper.connect 并且也忘记了为 Spark 中来自 Kafka 的消费者流式消息提供组 ID。我已经修复了这些地方(复制因子为 3 的重新制作主题)并观察到现在工作负载主要集中在工人 #1 上。根据@Dennis 的建议,我在 SSH-ing 到 worker #1 后运行 sudo jps 并获得以下输出:

[Removed this section to save character space; it was only the error messages from a failed call to jmap so it didn't hold any useful information]

编辑 4: 我现在在工人 #1 执行者的标准输出文件中看到了这一点:

2017-07-27 22:16:24
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):
===Truncated===
Heap
 PSYoungGen      total 814592K, used 470009K [0x000000063c180000, 0x000000069e600000, 0x00000007c0000000)
  eden space 799744K, 56% used [0x000000063c180000,0x0000000657e53598,0x000000066ce80000)
  from space 14848K, 97% used [0x000000069d780000,0x000000069e5ab1b8,0x000000069e600000)
  to   space 51200K, 0% used [0x0000000698200000,0x0000000698200000,0x000000069b400000)
 ParOldGen       total 574464K, used 180616K [0x0000000334400000, 0x0000000357500000, 0x000000063c180000)
  object space 574464K, 31% used [0x0000000334400000,0x000000033f462240,0x0000000357500000)
 Metaspace       used 49078K, capacity 49874K, committed 50048K, reserved 1093632K
  class space    used 6054K, capacity 6263K, committed 6272K, reserved 1048576K

2017-07-27 22:06:44
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):
===Truncated===
Heap
 PSYoungGen      total 608768K, used 547401K [0x000000063c180000, 0x000000066a280000, 0x00000007c0000000)
  eden space 601088K, 89% used [0x000000063c180000,0x000000065d09c498,0x0000000660c80000)
  from space 7680K, 99% used [0x0000000669b00000,0x000000066a2762c8,0x000000066a280000)
  to   space 36864K, 0% used [0x0000000665a80000,0x0000000665a80000,0x0000000667e80000)
 ParOldGen       total 535552K, used 199304K [0x0000000334400000, 0x0000000354f00000, 0x000000063c180000)
  object space 535552K, 37% used [0x0000000334400000,0x00000003406a2340,0x0000000354f00000)
 Metaspace       used 48810K, capacity 49554K, committed 49792K, reserved 1093632K
  class space    used 6054K, capacity 6263K, committed 6272K, reserved 1048576K

当错误发生时,worker #2 的执行者收到SIGNAL TERM 并被标记为死亡。此时,它是唯一死去的执行者。

奇怪的是,Spark 作业在 10 分钟左右后又恢复了。查看 Spark UI 界面,只有 worker #1 的 executor 处于活动状态,其余的都已死亡。这是第一次发生。

编辑 5: 同样,按照@Dennis 的建议(谢谢@Dennis!),这次运行sudo -u yarn jmap -histo <pid>。这是来自CoarseGrainedExecutorBackend 大约 10 分钟后最占用内存的前 10 个类:

 num     #instances         #bytes  class name
----------------------------------------------
   1:        244824      358007944  [B
   2:        194242      221184584  [I
   3:       2062554      163729952  [C
   4:        746240       35435976  [Ljava.lang.Object;
   5:           738       24194592  [Lorg.apache.spark.unsafe.memory.MemoryBlock;
   6:        975513       23412312  java.lang.String
   7:        129645       13483080  java.io.ObjectStreamClass
   8:        451343       10832232  java.lang.StringBuilder
   9:         38880       10572504  [Z
  10:        120807        8698104  java.lang.reflect.Field

另外,我遇​​到了一种导致执行程序死亡的新型错误。它产生了一些在 Spark UI 中突出显示的失败任务,并在执行程序的 stderr 中找到了这一点:

17/07/28 00:44:03 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 6821.0 (TID 2585)
java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
17/07/28 00:44:03 ERROR org.apache.spark.executor.Executor: Exception in task 0.1 in stage 6821.0 (TID 2586)
java.lang.AssertionError: assertion failed
    at scala.Predef$.assert(Predef.scala:156)
    at org.apache.spark.storage.BlockInfo.checkInvariants(BlockInfoManager.scala:84)
    at org.apache.spark.storage.BlockInfo.readerCount_$eq(BlockInfoManager.scala:66)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:367)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2$$anonfun$apply$2.apply(BlockInfoManager.scala:366)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:366)
    at org.apache.spark.storage.BlockInfoManager$$anonfun$releaseAllLocksForTask$2.apply(BlockInfoManager.scala:361)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:361)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:736)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:342)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
17/07/28 00:44:03 ERROR org.apache.spark.util.Utils: Uncaught exception in thread stdout writer for /opt/conda/bin/python
java.lang.AssertionError: assertion failed: Block rdd_5480_0 is not locked for reading
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
    at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
    at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
    at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
17/07/28 00:44:03 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for /opt/conda/bin/python,5,main]
java.lang.AssertionError: assertion failed: Block rdd_5480_0 is not locked for reading
    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299)
    at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:720)
    at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:516)
    at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:46)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:35)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:509)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)

编辑 6: 这一次,我跑了 40 分钟后拿了jmap

 num     #instances         #bytes  class name
----------------------------------------------
   1:         23667      391136256  [B
   2:         25937       15932728  [I
   3:        159174       12750016  [C
   4:           334       10949856  [Lorg.apache.spark.unsafe.memory.MemoryBlock;
   5:         78437        5473992  [Ljava.lang.Object;
   6:        125322        3007728  java.lang.String
   7:         40931        2947032  java.lang.reflect.Field
   8:         63431        2029792  com.esotericsoftware.kryo.Registration
   9:         20897        1337408  com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeObjectField
  10:         20323         975504  java.util.HashMap

这些是ps ux的结果:

  USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
yarn       601  0.8  0.9 3008024 528812 ?      Sl   16:12   1:17 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dproc_nodema
yarn      6086  6.3  0.0  96764 24340 ?        R    18:37   0:02 /opt/conda/bin/python -m pyspark.daemon
yarn      8036  8.2  0.0  96296 24136 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8173  9.4  0.0  97108 24444 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8240  9.0  0.0  96984 24576 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8329  7.6  0.0  96948 24720 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8420  8.5  0.0  96240 23788 ?        R    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8487  6.0  0.0  96864 24308 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8554  0.0  0.0  96292 23724 ?        S    18:37   0:00 /opt/conda/bin/python -m pyspark.daemon
yarn      8564  0.0  0.0  19100  2448 pts/0    R+   18:37   0:00 ps ux
yarn     31705  0.0  0.0  13260  2756 ?        S    17:56   0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app
yarn     31707  0.0  0.0  13272  2876 ?        Ss   17:56   0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
yarn     31713  0.4  0.7 2419520 399072 ?      Sl   17:56   0:11 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Xmx6
yarn     31771  0.0  0.0  13260  2740 ?        S    17:56   0:00 bash /hadoop/yarn/nm-local-dir/usercache/<user_name>/app
yarn     31774  0.0  0.0  13284  2800 ?        Ss   17:56   0:00 /bin/bash -c /usr/lib/jvm/java-8-openjdk-amd64/bin/java 
yarn     31780 11.1  1.4 21759016 752132 ?     Sl   17:56   4:31 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -server -Xmx1
yarn     31883  0.1  0.0  96292 27308 ?        S    17:56   0:02 /opt/conda/bin/python -m pyspark.daemon

在这种情况下,CoarseGrainedExecutorBackEndpid31780

编辑 7: 在 Spark 设置中增加 heartbeatInterval 并没有改变任何东西,事后看来这是有道理的。

我创建了一个简短的 bash 脚本,它使用控制台使用者从 Kafka 读取 5 秒,并将消息写入文本文件。文本文件被上传到 Spark 流的 Hadoop。我们通过这个方法测试了Timeout是否和Kafka有关。

从 Hadoop 流式传输并从 Spark 输出到 Kafka 导致 SocketTimeout 直接从 Kafka 流式传输而不从 Spark 输出到 Kafka 导致 SocketTimeout 从 Hadoop 流式传输而不从 Spark 输出到 Kafka 导致 SocketTimeout

所以我们继续假设 Kafka 与超时无关。

我们安装了 Stackdriver Monitoring 以查看超时发生时的内存使用情况。指标没有什么真正有趣的;内存使用情况在整个过程中看起来相对稳定(最繁忙的节点最多徘徊在 10~15% 左右)。

我们猜测可能与工作节点之间的通信有关,这可能是导致问题的原因。目前,我们的数据流量非常低,因此即使是一名工作人员也可以相对轻松地处理所有工作负载。

在单个节点集群上运行 Spark 作业,同时从不同集群的 Kafka 代理流式传输似乎已经停止了 SocketTimeout... 除了上面记录的AssertionError 现在经常发生。

根据@Dennis 的建议,我这次创建了一个没有 jupyter 初始化脚本的新集群(也是单节点),这意味着 Spark 现在可以在 Python v2.7.9 上运行(没有 Anaconda)。第一次运行,Spark 只用了 15 秒就遇到了SocketTimeoutException。第二次运行了 2 个多小时,以相同的 AssertionError 失败。我开始怀疑这是否是 Spark 内部的问题。第三次跑了大约40分钟,然后跑到SocketTimeoutException

【问题讨论】:

使用更高内存的工作线程是否会增加每次发生错误之前所用的时间? @DennisHuo 据我所知,增加内存似乎并没有改变发生错误所需的时间 您查看过 Spark 提供的“stdout”、“stderr”和 Thread Dump 链接是否有任何额外的线索?尤其是对于那些具有“堆存储内存”的人来说,它们比其他所有东西都高。内存蠕变主要是线性的吗?每次遇到错误之前的实际时间是多少? @DennisHuo 感谢您的耐心等待。我在编辑 2 中添加了一些关于 stdout/stderr 的附加信息。至于内存蠕变,是的,它主要是线性的。正如我所提到的,关于错误之前的实际时间量,它非常零星,所以很难给出一个可靠的答案。最近一次运行用了 15 分 3 秒才出现错误。之前的运行耗时 1 小时 41 秒。 它是两个独立的执行程序,但都在具有高存储空间的 worker 0 上,这对我来说意味着您的 Kafka 主题可能存在一些偏差。进入 Kafka 的数据来源是什么?您正在使用的主题是否设置了 Kafka 复制因子 > 1?此外,如果你运行它,你应该能够通过 SSH 连接到内存使用率更高的工作节点,如果你输入 sudo jps,你应该会看到一个看起来像 Spark 执行器的东西(我完全忘记了,可能是 CoarseGrainedExecutorBackend)。无论哪种方式,使用内存找到java进程,然后sudo jmap -histo &lt;pid&gt; 【参考方案1】:

我的一个客户在 Google Cloud Dataproc 中看到各种生产 Pyspark 作业(Spark 版本 2.2.1)间歇性地失败,堆栈跟踪与您的非常相似:

ERROR org.apache.spark.api.python.PythonRDD: Error while sending iterator
    java.net.SocketTimeoutException: Accept timed out
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:711)

我发现在 Dataproc 集群虚拟机上禁用 ipv6 似乎可以解决问题。一种方法是将这些行添加到 Dataproc 初始化脚本中,以便它们在集群创建时运行:

printf "\nnet.ipv6.conf.default.disable_ipv6 = 1\nnet.ipv6.conf.all.disable_ipv6=1\n" >> /etc/sysctl.conf
sysctl -p

【讨论】:

感谢您的回答!不幸的是,我们现在没有办法对此进行测试,但如果其他人可以验证禁用 IPv6 是否有效,我很乐意接受这个答案

以上是关于Dataproc 上的 PySpark 因 SocketTimeoutException 而停止的主要内容,如果未能解决你的问题,请参考以下文章

在 Dataproc 上的 Anaconda venv 中导入 PySpark 错误

Google Cloud Dataproc 上的 Pyspark 作业失败

在不使用 Dataproc 的情况下将 GCP 与 PySpark 连接

由于 python 版本,运行 PySpark DataProc Job 时出错

如何检查 Dataproc 上 pyspark 作业的每个执行程序/节点内存使用指标?

如何提交依赖于 google dataproc 集群的 pyspark 作业