Spark 2.2 排序失败,数据集庞大

Posted

技术标签:

【中文标题】Spark 2.2 排序失败,数据集庞大【英文标题】:Spark 2.2 Sort fails with huge dataset 【发布时间】:2019-05-07 11:34:10 【问题描述】:

当基于 4 列排序一个巨大的数据集 (1.2 T) 时,我遇到了一个问题。我还需要在排序后立即根据排序函数中使用的列之一在 HDFS 中写入最终数据集时对该数据集进行分区。

这是我几天前发布的一篇 *** 帖子,描述了我在使用相同代码时遇到的另一个问题,但与加入两个数据集有关:

previous issue

我使用这篇文章的答案来改进我的代码。现在连接工作正常。

我在没有排序的情况下测试了代码,它工作正常。为了执行排序,我考虑过根据四列对数据进行分区。

一个分区的大小为500MB。然后我有2600=1.2T/500MB 分区。

在执行 spark 作业时,我收到 shuffle.RetryingBlockFetcher 错误(请参阅下面的错误日志)。

我的问题是:

在 spark 中对数据进行排序以避免洗牌的最佳方法是什么?还是减少? 我能否更正/添加改进代码以执行排序? 我真的必须这样排序吗?我不能使用 Group By 等其他技术吗?

我的代码片段:

已编辑

    val uh = uh_months
      .withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
        to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
      //      .withColumn("DVA_1", to_date((unix_timestamp(col("DVA"), "ddMMMyyyy")).cast(TimestampType)))
      .withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
      .drop("UHDIN_YYYYMMDD")
      .drop("january")
      .drop("DVA").repartition(1300,col("MMED"),col("DEBCRED"),col("NMTGP"))//.repartition(1300,col("NO_NUM"))

    val uh_flag_comment = new TransactionType().transform(uh)
    val uh_repartitioned = uh_flag_comment.repartition(1300,col("NO_NUM"))

    val uh_joined = uh_repartitioned.join(broadcast(smallDF), "NO_NUM")
      .select(
        uh.col("*"),
        smallDF.col("PSP"),
        smallDF.col("minrel"),
        smallDF.col("Label"),
        smallDF.col("StartDate"))
      .withColumnRenamed("DVA_1", "DVA")

    val uh_final = uh_joined.repartition(1300, col("PSP")).sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    return uh_final

TransactionType 是一个类,我使用正则表达式根据 3 列(MMEDDEBCREDNMTGP)的值向我的uh 数据框添加一个新列。

在没有排序的情况下,使用集群的全部容量,代码大约在 1 小时内运行。

执行计划

== Physical Plan ==
Exchange hashpartitioning(PSP#82, 2600)
+- *Sort [PSP#82 ASC NULLS FIRST, NO_NUM#252 ASC NULLS FIRST, UHDIN#547 ASC NULLS FIRST, HOURMV#175 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(PSP#82 ASC NULLS FIRST, NO_NUM#252 ASC NULLS FIRST, UHDIN#547 ASC NULLS FIRST, HOURMV#175 ASC NULLS FIRST, 200)
      +- Exchange hashpartitioning(PSP#82, NO_NUM#252, UHDIN#547, HOURMV#175, 2600)
         +- *Project [NO_NUM#252, DEV#153, DEBCRED#154, BDGRORI#155, BDGREUR#156, BEWC#157, MSG30_NL#158, SCAPMV#159, USERID#160, MMED#161, TNUM#162, NMTGP#163, BKA#164, CATEXT#165, SEQETAT#166, ACCTYPE#167, BRAND#168, FAMILY#169, SUBFAMILY#170, FORCED_DVA#172, BYBANK#173, CPTE_PROTEGE#174, HOURMV#175, RDFB#176, ... 30 more fields]
            +- *BroadcastHashJoin [NO_NUM#252], [NO_NUM#13], Inner, BuildRight
               :- Exchange hashpartitioning(NO_NUM#252, 1300)
               :  +- *Project [NUM#152 AS NO_NUM#252, DEV#153, DEBCRED#154, BDGRORI#155, BDGREUR#156, BEWC#157, MSG30_NL#158, SCAPMV#159, USERID#160, MMED#161, TNUM#162, NMTGP#163, BKA#164, CATEXT#165, SEQETAT#166, ACCTYPE#167, BRAND#168, FAMILY#169, SUBFAMILY#170, FORCED_DVA#172, BYBANK#173, CPTE_PROTEGE#174, HOURMV#175, RDFB#176, ... 26 more fields]
               :     +- *Filter (BEWC#157 INSET (25003,25302,25114,20113,12017,20108,25046,12018,15379,15358,11011,20114,10118,12003,25097,20106,20133,10133,10142,15402,25026,25345,28023,15376,25019,28004,21701,25001,11008,15310,15003,2SOMEPORT,22048,15470,25300,25514,25381,25339,15099,25301,28005,28026,25098,25018,15323,25376,15804,15414,25344,25102,15458,15313,28002,25385,22051,25214,15031,12005,15425,20145,22011,15304,25027,14020,11007,25901,15343,22049,20112,12031,20127,15339,25421,15432,28025,25340,25325,20150,28011,25368,25304,22501,25369,28022,15098,12032,15375,25002,25008,10116,10101,22502,25090,15004,20105,12030,22503,15095,22007,15809,15342,15311,25216,10103,20122,11019,20142,15097,20147,20149,25005,25205,25380,15380,10120,25015,15384,11003,10110,25016,15090,25307,15001,25390,15312,10115,25219,15806,15459,12016,15359,15395,15302,12021,11701,10111,10148,25379,15807,10102,25352,25355,12010,25095,25394,20101,25413,15385,25322,28027,11026,15533,25201,25371,10128,11028,12020,15819,10143,28028,10123,10125,11020,25029,10122,25343,15015,12033,25014,12012,25024,25375,11023,25501,25402,22001,15317,12014,16114,20501,15046,12001,12022,10104,10117,12002,25499,10145,10153,12011,15350,15300,10119,25305,15345,25374,11027,25430,28021,25202,10121,28024,25101,28001,15321,11025,25358,15333,15501,25533,15372,12008,11015,10114,10113,10112,15303,15320,28006,22002,25359,10132,15497,25353,11029,25425,15374,12019,25437,11022,15357,20148,20111,26114,25099,25354,10124,25303,11010,20120,20135,15820,15331,28029) && isnotnull(NUM#152))
               :        +- *FileScan csv [UHDIN_YYYYMMDD#151,NUM#152,DEV#153,DEBCRED#154,BDGRORI#155,BDGREUR#156,BEWC#157,MSG30_NL#158,SCAPMV#159,USERID#160,MMED#161,TNUM#162,NMTGP#163,BKA#164,CATEXT#165,SEQETAT#166,ACCTYPE#167,BRAND#168,FAMILY#169,SUBFAMILY#170,DVA#171,FORCED_DVA#172,BYBANK#173,CPTE_PROTEGE#174,... 26 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [In(BEWC, [25003,25302,25114,20113,12017,20108,25046,12018,15379,15358,11011,20114,10118,12003,25..., ReadSchema: struct<UHDIN_YYYYMMDD:string,NUM:string,DEV:string,DEBCRED:string,BDGRORI:string,BDGREUR:string,B...
               +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
                  +- *Project [NO_NUM#13, PSP#82, minrel#370, Label#105, StartDate#106]
                     +- *SortMergeJoin [PSP#381], [PSP#82], Inner
                        :- *Sort [PSP#381 ASC NULLS FIRST], false, 0
                        :  +- Exchange hashpartitioning(PSP#381, 200)
                        :     +- *Project [PSP#381, NO_NUM#13, minrel#370]
                        :        +- SortMergeJoin [PSP#381, C_SNUM#14, minrel#370, NO_NUM#13], [NO_PSP#47, C_SNUM_1#387, C_NRELPR#50, NO_NUM_1#400], LeftOuter
                        :           :- *Sort [PSP#381 ASC NULLS FIRST, C_SNUM#14 ASC NULLS FIRST, minrel#370 ASC NULLS FIRST, NO_NUM#13 ASC NULLS FIRST], false, 0
                        :           :  +- Exchange hashpartitioning(PSP#381, C_SNUM#14, minrel#370, NO_NUM#13, 200)
                        :           :     +- SortAggregate(key=[NO_PSP#12, C_SNUM#14, NO_NUM#13], functions=[min(C_NRELPR#15)])
                        :           :        +- *Sort [NO_PSP#12 ASC NULLS FIRST, C_SNUM#14 ASC NULLS FIRST, NO_NUM#13 ASC NULLS FIRST], false, 0
                        :           :           +- Exchange hashpartitioning(NO_PSP#12, C_SNUM#14, NO_NUM#13, 200)
                        :           :              +- SortAggregate(key=[NO_PSP#12, C_SNUM#14, NO_NUM#13], functions=[partial_min(C_NRELPR#15)])
                        :           :                 +- *Sort [NO_PSP#12 ASC NULLS FIRST, C_SNUM#14 ASC NULLS FIRST, NO_NUM#13 ASC NULLS FIRST], false, 0
                        :           :                    +- *Project [NO_PSP#12, C_SNUM#14, NO_NUM#13, C_NRELPR#15]
                        :           :                       +- *Filter (((C_NRELPR#15 IN (001,006) && C_SNUM#14 IN (030,033)) && isnotnull(NO_NUM#13)) && isnotnull(NO_PSP#12))
                        :           :                          +- *FileScan csv [NO_PSP#12,NO_NUM#13,C_SNUM#14,c_nrelpr#15] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [In(c_nrelpr, [001,006]), In(C_SNUM, [030,033]), IsNotNull(NO_NUM), IsNotNull(NO_PSP)], ReadSchema: struct<NO_PSP:string,NO_NUM:string,C_SNUM:string,c_nrelpr:string>
                        :           +- *Sort [NO_PSP#47 ASC NULLS FIRST, C_SNUM_1#387 ASC NULLS FIRST, C_NRELPR#50 ASC NULLS FIRST, NO_NUM_1#400 ASC NULLS FIRST], false, 0
                        :              +- Exchange hashpartitioning(NO_PSP#47, C_SNUM_1#387, C_NRELPR#50, NO_NUM_1#400, 200)
                        :                 +- *Project [NO_PSP#47, NO_NUM#48 AS NO_NUM_1#400, C_SNUM#49 AS C_SNUM_1#387, c_nrelpr#50]
                        :                    +- *FileScan csv [NO_PSP#47,NO_NUM#48,C_SNUM#49,c_nrelpr#50] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<NO_PSP:string,NO_NUM:string,C_SNUM:string,c_nrelpr:string>
                        +- *Sort [PSP#82 ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(PSP#82, 200)
                              +- *Project [PSP#82, Label#105, StartDate#106]
                                 +- *Filter isnotnull(PSP#82)
                                    +- *FileScan csv [PSP#82,Label#105,StartDate#106] Batched: false, Format: CSV, Location: InMemoryFileIndex[SOMEHOST:SOMEPORT/SOMEPATH, PartitionFilters: [], PushedFilters: [IsNotNull(PSP)], ReadSchema: struct<PSP:string,Label:string,StartDate:string>

这是我在使用 sort 启动作业时遇到的主要错误:

19/05/06 18:02:25 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 214 outstanding blocks 
java.io.IOException: Failed to connect to SOMEHOST/SOMEADDRESS:SOMEPORT
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:98)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
    at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:108)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:228)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:435)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:323)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:140)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: SOMEHOST/SOMEADDRESS:SOMEPORT
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NiosocketChannel.doFinishConnect(NioSocketChannel.java:257)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more
19/05/06 18:02:25 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 214 outstanding blocks after 5000 ms
19/05/06 18:02:25 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 13 ms
19/05/06 18:02:28 INFO executor.Executor: Finished task 408.0 in stage 14.0 (TID 6696). 1733 bytes result sent to driver
19/05/06 18:02:28 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 6816
19/05/06 18:02:28 INFO executor.Executor: Running task 466.1 in stage 14.0 (TID 6816)
19/05/06 18:02:28 INFO storage.ShuffleBlockFetcherIterator: Getting 5073 non-empty blocks out of 5089 blocks
19/05/06 18:02:28 INFO client.TransportClientFactory: Found inactive connection to SOMEHOST/SOMEADDRESS:SOMEPORT, creating a new one.
19/05/06 18:02:28 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 82 outstanding blocks 
java.io.IOException: Failed to connect to SOMEHOST/SOMEADDRESS:SOMEPORT
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:98)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
    at org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:108)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:228)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:435)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:323)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:140)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:165)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: SOMEHOST/SOMEADDRESS:SOMEPORT
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
    ... 1 more

已编辑

另一种类型的错误:

19/05/06 18:06:16 ERROR executor.Executor: Exception in task 309.1 in stage 13.1 (TID 7592)
java.io.FileNotFoundException: /applis/hadoop/yarn/local/usercache/MYUSER/appcache/application_1555263602441_0123/blockmgr-aa586b76-ff58-4f88-b168-288c3e1b9f61/3c/temp_shuffle_ea967624-f633-4481-9a05-249b561e3c38 (No such file or directory)
    at java.io.FileInputStream.open0(Native Method)
    at java.io.FileInputStream.open(FileInputStream.java:195)
    at java.io.FileInputStream.<init>(FileInputStream.java:138)
    at org.spark_project.guava.io.Files$FileByteSource.openStream(Files.java:124)
    at org.spark_project.guava.io.Files$FileByteSource.openStream(Files.java:114)
    at org.spark_project.guava.io.ByteSource.copyTo(ByteSource.java:202)
    at org.spark_project.guava.io.Files.copy(Files.java:436)
    at org.spark_project.guava.io.Files.move(Files.java:651)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:277)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:216)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/05/06 18:06:16 ERROR executor.Executor: Exception in task 502.1 in stage 13.1 (TID 7599)
java.io.FileNotFoundException: /applis/hadoop/yarn/local/usercache/MYUSER/appcache/application_1555263602441_0123/blockmgr-aa586b76-ff58-4f88-b168-288c3e1b9f61/34/temp_shuffle_dd202cd1-ad8f-41c4-b4d1-d79621cd169e (No such file or directory)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
    at org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:102)
    at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:115)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:247)
    at org.apache.spark.shuffle.sort.ShuffleExternalSorter.writeSortedFile(ShuffleExternalSorter.java:201)
    at org.apache.spark.shuffle.sort.ShuffleExternalSorter.closeAndGetSpills(ShuffleExternalSorter.java:405)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:209)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:169)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/05/06 18:06:16 INFO executor.Executor: Finished task 200.2 in stage 13.1 (TID 7568). 2826 bytes result sent to driver
19/05/06 18:06:16 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
19/05/06 18:06:16 INFO util.ShutdownHookManager: Shutdown hook called

一些信息/背景:

我在生产环境中工作(请参阅下面的集群配置)。我无法升级我的 spark 版本。我没有 spark UI 或 yarn UI 来监控我的工作。我只能检索纱线日志。

Spark 版本:2.2

集群配置:

21 个计算节点(工作者) 每个 8 个内核 每个节点 64 GB RAM

当前 Spark 配置:

-master: 纱线

-执行器-内存:42G

-executor-cores: 5

-驱动内存:42G

-num-executors: 32

-spark.sql.broadcastTimeout=3600

-spark.kryoserializer.buffer.max=512

-spark.yarn.executor.memoryOverhead=2400

-spark.driver.maxResultSize=500m

-spark.memory.storageFraction=0.3

-spark.memory.fraction=0.9

-spark.hadoop.fs.permissions.umask-mode=007

作业是如何执行的:

我们使用 IntelliJ 构建一个工件(jar),然后将其发送到服务器。然后执行一个 bash 脚本。这个脚本:

导出一些环境变量(SPARK_HOME、HADOOP_CONF_DIR、PATH 和 SPARK_LOCAL_DIRS)

使用上面 spark 配置中定义的所有参数启动 spark-submit 命令

检索应用程序的纱线日志

【问题讨论】:

SOMEHOST/SOMEADDRESS:SOMEPORT - 看起来不对,可能是 spark 集群配置不正确! 您好@avloss 感谢您的快速回复。我自己修改了这些字段。这些不是正确的值:) 您检查了为什么SOMEHOST/SOMEADDRESS:SOMEPORT 没有响应吗?那是执行器机器吗?外部 shuffle 节点(您是否使用 shuffle 服务)? 嗨@Ali,快速观察/提问。真的有必要重新分区这么多次吗? uh repartition(1300,col("MMED"),col("DEBCRED"),col("NMTGP")) 上的第一个重新分区由 transform(uh) 使用,然后加入的第二个重新分区是正确的吗?另外,您为什么要对所有这些列进行排序col("PSP"), col("NO_NUM"), col("UHDIN"), col("HOURMV") 这真的是一个要求吗?最后一个问题,你为什么要uh_sorted.repartition(2600,col("PSP")) 这会再次打乱数据,从而取消之前的排序? 你好@_shay 由于机器上的纱线用户缓存正在使用这台机器的全部磁盘空间,因此节点已关闭。因此节点宕机了。我不知道我是否在使用外部随机播放服务(我没有为此设置任何内容) 【参考方案1】:

以下是针对您的案例的一些建议:

change 1:基于更大的生成数据集 1.2TB 重新分区。另外我此时删除了repartition(col("NO_NUM"), col("UHDIN"), col("HOURMV")),因为它将被下一个重新分区(“NO_NUM”)覆盖,因此它是多余的。

change 2:使用persist来保存我们刚刚分区的数据,以避免为同一个数据帧一遍又一遍地重新分区(请查看上一篇文章中的链接,了解如何这行得通)

change 3:删除uh_flag_comment.repartition(1300,col("NO_NUM")),因为它对我来说似乎是多余的。虽然 只有在 TransactionType().transform(uh) 导致重新洗牌时才有用,例如在内部进行 join 或 groupBy!这样的操作会修改我们在上一步设置的分区键repartition(2600, col("NO_NUM")

更改 4:使用 col("NO_NUM"), col("UHDIN"), col("HOURMV") 重新分区,因为这将是 orderBy 将使用的分区键,因此这两个应该相同

更改 5:orderBy 与 col("NO_NUM"), col("UHDIN"), col("HOURMV")

更改 6:将执行者数量增加到 40 个

val uh = uh_months
      .withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
        to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
      //      .withColumn("DVA_1", to_date((unix_timestamp(col("DVA"), "ddMMMyyyy")).cast(TimestampType)))
      .withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
      .drop("UHDIN_YYYYMMDD")
      .drop("january")
      .drop("DVA")
      .repartition(2600, col("NO_NUM"))//change 1: repartition based on the larger generated dataset also removed repartition(col("NO_NUM"), col("UHDIN"), col("HOURMV")) since it will be overriten from the next repartition()
      .persist() //change 2: save your keys (please check the links from the previous post on how this works)

    val uh_flag_comment = new TransactionType().transform(uh)

    //change 3: the previous repartition was redudant 
    val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
      .select(
        uh.col("*"),
        smallDF.col("PSP"),
        smallDF.col("minrel"),
        smallDF.col("Label"),
        smallDF.col("StartDate"))
      .withColumnRenamed("DVA_1", "DVA")
      .repartition(2600, col("PSP"), col("NO_NUM"), col("UHDIN"), col("HOURMV"))//change 4: this is the partition key that will be used by the orderBy therefore these two should be identical
      .persist()//optional, try to remove it as well

    // change 5: removed redudant repartition and addded the same partition information as above   
    val uh_final = uh_joined.orderBy(col("PSP), col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    return uh_final

祝你好运,如果您有任何问题,请告诉我

【讨论】:

你好@Alexandros 谢谢你的回答。但是,我需要编写由PSP 重新分区的uh_final 数据(我所做的是基于col("PSP"), col("NO_NUM"), col("UHDIN"), col("HOURMV") 重新分区并按相同的列排序)。这就是为什么我考虑通过PSP 重新分区,然后使用sortWithinPartitions 函数。修改后的结果还是一样的:所有节点开始一个接一个的宕机,在作业执行30分钟后。我认为节点上的纱线用户缓存正在被打乱的数据填充。 您好@Ali,那么您必须将 PSP 密钥与其他三个密钥一起包含,因此 repartition(2600, col("PSP"), col("NO_NUM"), col("UHDIN"), col("HOURMV")) 然后 orderBy(col("PSP"), col("NO_NUM"), col("UHDIN"), col("HOURMV")) 您不应该将密钥分开并单独使用它们,因为这样您就可以更改分区密钥并覆盖上一个。我会相应地更改代码 你好@Alexandros 这正是我所做的:) 我实际上是使用您修改@Alexandros 的代码启动了这项工作,就像我说的那样,它失败了,30 分钟后节点一个接一个地关闭,因为洗牌的数据填充了每个节点上的纱线用户缓存。 ok 尝试找出在 DAG 上失败的步骤或排除步骤(您可以从最后一个开始)。这把钥匙可能是问题所在,因为它会非常昂贵!您确定需要所有这些键进行排序吗?【参考方案2】:

我使用@Alexandros 发布的大部分答案成功地对数据进行了排序(然后运行整个代码)。

不过,我在集群中的配置方面做了一些更改:

我将 Executor 内存增加到 45 G(原来是 42 G) 我将 spark 参数 --executor-memory 更改为 45G 而不是 42G 我将executors的数量增加到40 我还在每个节点上增加了/applis/hadoop/yarn/local/usercache/MYUSER/ 的磁盘空间,添加了20-25G(这个文件夹在每个节点上的可用空间略低于50 G)。这是yarn usercache,Spark 在其中写入中间混洗数据块。由于我有一个 1.2T 的数据集,并且有 21 个节点,因此当数据分布在各个节点上时,每个节点上大约需要 60-65G 的磁盘空间。

我还使用了sortWithinPartition 函数(该函数运行良好,但经典排序函数失败)。另外,我只需要对每个分区进行排序,因为我是根据PSP进行分区的(如果数据集不是根据PSP排序的,也可以)。

下面是代码:

val uh = uh_months
  .withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
    to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
  //      .withColumn("DVA_1", to_date((unix_timestamp(col("DVA"), "ddMMMyyyy")).cast(TimestampType)))
  .withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
  .drop("UHDIN_YYYYMMDD")
  .drop("january")
  .drop("DVA")
  .repartition(3000, col("NO_NUM"))
  .persist()

val uh_flag_comment = new TransactionType().transform(uh)

val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
  .select(
    uh_flag_comment.col("*"),
    kl_holdmand_pruned.col("PSP"),
    kl_holdmand_pruned.col("minrel"),
    kl_holdmand_pruned.col("TerroLabel"),
    kl_holdmand_pruned.col("TerroStartDate"))
  .withColumnRenamed("DVA_1", "DVA")

smallDF.unpersist()
uh.unpersist()

val uh_to_be_sorted = uh_joined.repartition(3000, col("PSP"))
val uh_final = uh_to_be_sorted.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

uh_final

【讨论】:

以上是关于Spark 2.2 排序失败,数据集庞大的主要内容,如果未能解决你的问题,请参考以下文章

将数据集写入 MS SQL 服务器失败

面对 Spark 上小数据集的大数据溢出

spark 函数

Spark - 在数据集的几列上应用 UDF 并形成新列

Spark 3.0 排序并应用于组 Scala/Java

大数据Spark MLlib机器学习