Spark shuffle-write 和 shuffle-read 中对数据倾斜情况的处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark shuffle-write 和 shuffle-read 中对数据倾斜情况的处理相关的知识,希望对你有一定的参考价值。

参考技术A

主要想回答两个问题:

ShuffleMapTask中,指定此task运算真对上游RDD的那个partition,即map端的partition,writer.write操作的时候,根据RDD的partitioner生成新的partitionId,然后写入,完成shuffle-write,下游shuffle-read的时候,拉取相应得partition数据即可;

下面插入一段说一下Spark中netty block server的实现:

当reduce端读取数据的时候,ShuffleBlockFetcherIterator :: sendRequest 调用 NettyBlockTransferService :: fetchBlocks 调用OneForOneBlockFetcher::start 首先调用TransportClient :: sendRpcSync 发送OpenBlocks发送到上面提到的netty block server,然后发送ChunkFetchRequest,获取对应的chunk,这里面的chunk其实就是一个一个的block,一个(shuffleId, mapId, bucketId(reduceId))唯一确定一个block,也即下游RDD的一个partition;

shuffle-read其实是从上游executor以block为单位获取数据,这里就遇到了一个问题,如果数据分布不均匀,导致下游某个partition过大,即这个block过大,就会出现OOM,Netty会报错direct buffer out of memory;
上面说的OOM是Netty处理数据时堆外内存的OOM,如果限制使用堆外内存(为Executor增加配置-Dio.netty.noUnsafe=true,就可以让shuffle不使用堆外内存),会报堆内内存OOM,java.lang.OutOfMemoryError: Java heap space;

如何解决?
其实在对Block处理过程中,无论是Client端还是Server端,都是以ManagedBuffer来处理的,具体实现类有FileSegmentManagedBuffer,NettyManagedBuffer等,Server端收到请求之后,会将返回的Block封装在FileSegmentmanagedBuffer,这个类内部不cache数据,提供从文件中读取block data的方法,但是过rpc server时通过encoder会进行封装,从FIleChannel零拷贝写入SocketChannel,具体实现就是在MessageEncoder里面将FileSegmentBuffer converToNetty,其实生成时FileRegion,后面封装到MessageWithHeader也是FileRegion,写出到List<Object> out,Netty会调用FileRegion中的transferTo,将内容写到目标channel,写入是直接调用file.transfer,实现零拷贝;
所以是否可以尝试添加一个新的协议,在OneForOneBlockFetcher中,判断,如果一个block小于某值,比如100M,使用原来的方式fetch数据,否则,服务端收到请求之后返回数据流,客户端收到数据流之后,将数据写到本地文件,形成新的FileSegmentManagedBuffer,供后续处理,对比原来的实现,就是将客户端直接处理NettyManagedBuffer变成直接处理FileSegmentManagedBuffer;

差分数组

差分数组

引入:

首先,给出一个问题:
给出n个数,再给出Q个询问,每个询问给出le,ri,x,最后再询问某个数,或者某个区间和(离线询问)

类似这样的问题,我们可以利用差分数组来解决。

假如这里有一个数组shu[n+1],其中shu[i]存储的便是某个具体的数

由前缀和数组的定义:(sum[x] = sumlimits_{i=1}^{x} shu[i])

类似地,我们给出差分数组的定义:d[i] = shu[i] - shu[i-1];
容易知道:

  1. (shu[x] = sumlimits_{i=1}^{x} d[i])
  2. (shu[x] = sumlimits_{i=1}^{x} shu[i] = sumlimits_{i=1}^{x}sumlimits_{j=1}^{i}d[j] = sumlimits_{i=1}^{x}(x-i+1)*d[i])
    3.对区间l到r同时加上s,只需要令d[l]+=s并且d[r+1]-=s即可维护,因此维护代价为O(1)
    4.由1和2我们可以知道,通过差分数组能O(n)求出经过Q次维护的定点询问或区间询问(求出定点shu[n]就相当于求出所有定点,求出全区间就相当于所有区间了)

实战:

SP10500 HAYBALE - Haybale stacking
技术图片

思路:构建差分数组,经过K次差分数组维护,最后将结果sort即可

AC代码如下:

#include <stdio.h>
#include <stdlib.h>

int com(const void *a, const void *b)
{
    return *(int *)a - *(int *)b;
}
int main(void)
{
    int n, k;
    scanf("%d %d",&n,&k);
    int shu[n+2], d[n+2];
    for(int i = 1; i <= n; i++)
    {
        shu[i] = 0;
    }
    shu[0] = 0; //不要忘了边界
    for(int i = 1; i <= n; i++)
    {
        d[i] = shu[i] - shu[i-1]; //构建差分数组
    }
    int a, b;
    for(int i = 1; i <= k; i++)
    {
        scanf("%d %d",&a,&b);
        d[a]++;
        d[b+1]--;
    }
    for(int i = 1; i <= n; i++)
    {
        shu[i] = shu[i-1]+d[i];
    }
    qsort(shu+1, n, sizeof(int), com); //排序并输出中间的
    printf("%d
",shu[(n)/2+1]);
    return 0;
}

以上是关于Spark shuffle-write 和 shuffle-read 中对数据倾斜情况的处理的主要内容,如果未能解决你的问题,请参考以下文章

差分数组

test6

#define LEN sizeof(struct shu) 是啥意思

SHU 414 - 字符串进制转换

选择排序

SHU 413 - 添加好友