SparkShuffer机制(三)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkShuffer机制(三)相关的知识,希望对你有一定的参考价值。

参考技术A Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,这期间涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等,所以说Shuffle是整个应用程序运行过程中非常昂贵的一个阶段,理解Spark Shuffle原理有助于优化Spark应用程序。

-上面是使用哪种 writer 的判断依据, 是否开启 mapSideCombine 这个判断,是因为有些算子会在 map 端先进行一次 combine, 减少传输数据。

-因为 BypassMergeSortShuffleWriter 会临时输出Reducer个(分区数目)小文件,所以分区数必须要小于一个阀值,默认是小于200。

-UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。

我们可以先考虑一个问题,假如我有 100亿条数据,但是我们的内存只有1M,但是我们磁盘很大, 我们现在要对这100亿条数据进行排序,是没法把所有的数据一次性的load进行内存进行排序的,这就涉及到一个外部排序的问题,我们的1M内存只能装进1亿条数据,每次都只能对这 1亿条数据进行排序,排好序后输出到磁盘,总共输出100个文件,最后怎么把这100个文件进行merge成一个全局有序的大文件。我们可以每个文件(有序的)都取一部分头部数据最为一个 buffer, 并且把这 100个 buffer放在一个堆里面,进行堆排序,比较方式就是对所有堆元素(buffer)的head元素进行比较大小, 然后不断的把每个堆顶的 buffer 的head 元素 pop 出来输出到最终文件中, 然后继续堆排序,继续输出。如果哪个buffer 空了,就去对应的文件中继续补充一部分数据。最终就得到一个全局有序的大文件。

如果你能想通我上面举的例子,就差不多搞清楚sortshufflewirter的实现原理了,因为解决的是同一个问题。

SortShuffleWriter 中的处理步骤就是

使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在内存中进行排序, 排序的 K 是(partitionId, hash(key)) 这样一个元组。

如果超过内存 limit, 我 spill 到一个文件中,这个文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根据 hash(key)进行比较排序

如果需要输出全局有序的文件的时候,就需要对之前所有的输出文件 和 当前内存中的数据结构中的数据进行 merge sort, 进行全局排序

和我们开始提的那个问题基本类似,不同的地方在于,需要对 Key 相同的元素进行 aggregation, 就是使用定义的 func 进行聚合, 比如你的算子是 reduceByKey(+), 这个func 就是加法运算, 如果两个key 相同, 就会先找到所有相同的key 进行 reduce(+) 操作,算出一个总结果 Result,然后输出数据(K,Result)元素。

SortShuffleWriter 中使用 ExternalSorter 来对内存中的数据进行排序,ExternalSorter内部维护了两个集合PartitionedAppendOnlyMap、PartitionedPairBuffer, 两者都是使用了 hash table 数据结构, 如果需要进行 aggregation, 就使用 PartitionedAppendOnlyMap(支持 lookup 某个Key,如果之前存储过相同key的K-V 元素,就需要进行 aggregation,然后再存入aggregation后的 K-V), 否则使用 PartitionedPairBuffer(只进行添K-V 元素),

触发条件:

android binder 机制三(匿名Service)

什么是匿名Service?凡是没有到ServiceManager上注冊的Service,都是匿名Service。

还是拿上一篇的样例来举例,看代码:

status_t MediaPlayer::setDataSource(int fd, int64_t offset, int64_t length)
{
    status_t err = UNKNOWN_ERROR;
    const sp<IMediaPlayerService>& service(getMediaPlayerService());
    if (service != 0) {
        sp<IMediaPlayer> player(service->create(this, mAudioSessionId));
        if ((NO_ERROR != doSetRetransmitEndpoint(player)) ||
            (NO_ERROR != player->setDataSource(fd, offset, length))) {
            player.clear();
        }
        err = attachNewPlayer(player);
    }
    return err;
}

在BpMediaPlayerService中。create的实现例如以下:

virtual sp<IMediaPlayer> create(
        const sp<IMediaPlayerClient>& client, int audioSessionId) {
    Parcel data, reply;
    data.writeInterfaceToken(IMediaPlayerService::getInterfaceDescriptor());
    data.writeStrongBinder(client->asBinder());
    data.writeInt32(audioSessionId);

    remote()->transact(CREATE, data, &reply);
    return interface_cast<IMediaPlayer>(reply.readStrongBinder());
}
直接跳到服务端MediaPlayerService,看create的真正实现:

sp<IMediaPlayer> MediaPlayerService::create(const sp<IMediaPlayerClient>& client,
        int audioSessionId)
{
    pid_t pid = IPCThreadState::self()->getCallingPid();
    int32_t connId = android_atomic_inc(&mNextConnId);

    sp<Client> c = new Client(
            this, pid, connId, client, audioSessionId,
            IPCThreadState::self()->getCallingUid());

    ALOGV("Create new client(%d) from pid %d, uid %d, ", connId, pid,
         IPCThreadState::self()->getCallingUid());
    /* add by Gary. start {{----------------------------------- */
    c->setScreen(mScreen);
    /* add by Gary. end   -----------------------------------}} */
    c->setSubGate(mGlobalSubGate);  // 2012-03-12, add the global interfaces to control the subtitle gate

    wp<Client> w = c;
    {
        Mutex::Autolock lock(mLock);
        mClients.add(w);
    }
    return c;
}

从代码中,我们能够看出,service->create(this, mAudioSessionId)是返回了一个參数为Client类型的BpMediaPlayer对象,当中Client为MediaPlayerService的私有内部类,其声明为:class Client : publicBnMediaPlayer。

这样,Binder通信的服务端和client就建立起来了。Client端BpMediaPlayer由MediaPlayer使用,Server端BnMediaPlayer由MediaPlayerService使用。

BpMediaPlayer是怎样获得BnMediaPlayer的handle值的呢?答案在MediaPlayerService返回这个binder的引用的时候,binder驱动保存了这个binder实体的各种数据。创建了节点,看以下的代码:

status_t BnMediaPlayerService::onTransact(
    uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
    switch (code) {
        case CREATE: {
            CHECK_INTERFACE(IMediaPlayerService, data, reply);
            sp<IMediaPlayerClient> client =
                interface_cast<IMediaPlayerClient>(data.readStrongBinder());
            int audioSessionId = data.readInt32();
            sp<IMediaPlayer> player = create(client, audioSessionId);
            reply->writeStrongBinder(player->asBinder());
            return NO_ERROR;
        } break;
	……
	}
}

答案就在这句话中:reply->writeStrongBinder(player->asBinder());

当这个reply写到Binder驱动时,驱动会特殊处理这样的IBinder类型的数据,为Bbinder建立一个handle。

通信的通路建立后,就能够进行通信了:player->setDataSource(fd, offset, length)

之后的实现,和上一篇讲的普通Service就一样了。



以上是关于SparkShuffer机制(三)的主要内容,如果未能解决你的问题,请参考以下文章

线程系列三线程的等待与唤醒机制

Java Web学习认证机制三巨头--sessioncookietoken

三:GC回收机制

解决! 华为鸿蒙安装Google Play,GMS三件套,设备未获得Play保护机制认证。(越过谷歌play保护机制认证,解除Google play保护机制弹窗)

Java之反射机制三:获取类的方法

JVM理论:(三/5)类加载机制