Android基础进阶 - 消息机制 之Native层分析
Posted 音视频开发之旅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android基础进阶 - 消息机制 之Native层分析相关的知识,希望对你有一定的参考价值。
目录
- 基础知识简介
- Linux eventfd事件等待/响应机制
- Linux IO多路复用epoll
- android消息机制Native层分析
- nativeInit流程
- nativePollOnce流程
- nativeWake流程
- 资料
- 收获
上一篇中关于ThreadLocal的使用,遗漏了一个点:ThreadLocal的回收,使用不当会操作内存泄漏。通过上一篇的分析我们知道了ThreadLocalMap.Entry中的key时ThreadLocal的弱引用,而value需要在堆上分配的内存,当key的强引用断开之后,在gc的时候会被回收,就过时key为null,但是value还存在。如果处理不当,可能会引发内存泄露。
图片来自:彻底搞清楚ThreadLocal与弱引用
那么key为null了value何时被清除呐?
ThreadLocal的设计者考虑到这个问题,在每次调用ThreadLocal的set/get方法时会清除key为null的value。但是谁知道下次什么时候调用set/get,这是一种被动的清除方式。
除此之外ThreadLocal还提供了remove方法,可以有我们自动的进行清除。当ThreadLocal不再使用的时候,要记得调用remove。示例如下
public class ExampleUnitTest {
private static ThreadLocal<String> sStrThreadlocal = new ThreadLocal<>();
private static ThreadLocal<Integer> sIntegerThreadLocal = new ThreadLocal<>();
@Test
public void testThreadLocal(){
//在主线程给Threadlocal赋值,请取出输出
sStrThreadlocal.set("aaa");
String value = sStrThreadlocal.get();
sIntegerThreadLocal.set(1);
Integer intValue = sIntegerThreadLocal.get();
System.out.println("111 curThreadId="+Thread.currentThread()+" strthreadLocalValue="+value
+" intThreadLocalValue="+intValue);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//最后在输出下主线程的ThreadLocal值
value = sStrThreadlocal.get();
intValue = sIntegerThreadLocal.get();
System.out.println("444 curThreadId="+Thread.currentThread()+" strthreadLocalValue="+value
+" intThreadLocalValue="+intValue);
//用完之后记得remove,虽然,ThreadLocalMap.Entry的key是ThreadLocal的弱引用,但是value占用的内存空间还在。
//被回收的场景有两个,再次调用set或者get方法是会坚持map中是否有key为空的Entry(由于key是弱引用,外部强引用依赖断开后,gc时就会回收该key,
// 回收后即可null),如果有则清除。
//但是不知道什么时候再次调用set或者get,这种被动的方式只能说是做了个保证,如果没有调用set/get,就可能引发内存泄露
//ThreadLocal提供了一个remove方法,由我们主动清除ThreadLocal对应的ThreadLocalMap.Entry中的堆内存
sStrThreadlocal.remove();
sIntegerThreadLocal.remove();
value = sStrThreadlocal.get();
intValue = sIntegerThreadLocal.get();
System.out.println("after remove 444 curThreadId="+Thread.currentThread()+" strthreadLocalValue="+value
+" intThreadLocalValue="+intValue);
}
}
对应的输出为
111 curThreadId=Thread[main,5,main] strthreadLocalValue=aaa intThreadLocalValue=1
444 curThreadId=Thread[main,5,main] strthreadLocalValue=aaa intThreadLocalValue=1
after remove 444 curThreadId=Thread[main,5,main] strthreadLocalValue=null intThreadLocalValue=null
下面开始本篇的分析学习
我们在前面两篇关于Android消息机制的分析学习中了解到MessageQueue中涉及到何Native的交互,比如nativeInit初始化Native层的MessageQueue;nativePollOnce阻塞;nativeWake唤醒。
问题是,为什么要这样设计?直接Java层不是一样可以管理维护消息队列以及消息的读取分发吗?nativePollOnce真正的意义是什么?这个问题一致困挠着我。通过反复查资料翻源码,终于了解了其流程。在消息机制Native层实现之前我们先了解下其中用到的一些Linux基础知识。否则的话,很容易被卡住。
一、 基础知识简介
1.1 Linux eventfd事件等待/响应机制
Linux常用的进程/线程间通信机制有管道、信号量、消息队列、信号、共享内存、socket等等,其中主要作为进程/线程间通知/等待的有管道pipe和socket。从Linux 2.6.27版本开始增加了eventfd,主要用于进程或者线程间的通信,eventfd用于进程/线程间通信,效率比pipe高.
1.2 Linux IO多路复用epoll
epoll是Linux中最高效的IO多路复用机制,可以同时监控多个描述符,当某个描述符就绪(读或写就绪),则立刻通知相应程序进行读或写操作
epoll有3个方法,epoll_create(), epoll_ctl(),epoll_wait()_
** epoll_create :创建epoll句柄**
int epoll_create(int size);
:用于创建一个epoll的句柄,size是指监听的描述符个数, 现在内核支持动态扩展,该值的意义仅仅是初次分配的fd个数,后面空间不够时会动态扩容。 当创建完epoll句柄后,占用一个fd值.
使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
** epoll_ctl:执行对需要监听的文件描述符(fd)的操作**
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
用于执行对需要监听的文件描述符(fd)的操作,比如EPOLL_CTL_ADD
fd:需要监听的文件描述符;
epoll_event:需要监听的事件
** epoll_wait:等待事件的到来**
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的到来,有三种情况会触发返回
1. 发生错误 返回值为负数
2. 等待超时(timeout)
3. 监测到某些文件句柄上有事件发生
** epoll和eventfd结合使用**
eventfd中无数据,线程触发epoll_wait()的等待,该线程处于阻塞,另外一个线程通过往eventfd中write数据,使描述符可读,epoll返回,这就达到了唤醒的目的。_
二、Android消息机制Native层简单分析
我们先来回顾下消息机制中Java层MessageQueue的往队列里加消息和从队列里面取消息的调用
//android.os.MessageQueue#enqueueMessage
boolean enqueueMessage(Message msg, long when) {
......
synchronized (this) {
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//如果消息链表为空,或者插入的Message比消息链表第一个消息要执行的更早,直接插入到头部
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
//否则在链表中找到合适位置插入
//通常情况下不需要唤醒事件队列,除非链表的头是一个同步屏障,并且该条消息是第一条异步消息
needWake = mBlocked && p.target == null && msg.isAsynchronous();
//具体实现如下,这个画张图来说明
//链表引入一个prev变量,该变量指向p也message(如果是for循环的内部第一次执行),然后把p进行向next移动,和需要插入的Message进行比较when
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}
//如果插入的是异步消息,并且消息链表第一条消息是同步屏障消息。
//或者消息链表中只有刚插入的这一个Message,并且mBlocked为true即,正在阻塞状态,收到一个消息后也进入唤醒
唤醒谁?MessageQueue.next中被阻塞的nativePollOnce
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
//android.os.MessageQueue#next
Message next() {
//native层NativeMessageQueue的指针
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
......
for (;;) {
//阻塞操作,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒
//nativePollOnce用于“等待”, 直到下一条消息可用为止. 如果在此调用期间花费的时间很长, 表明对应线程没有实际工作要做,或者Native层的message有耗时的操作在执行
nativePollOnce(ptr, nextPollTimeoutMillis);
}
可以看到在MessageQueue#next会调用课程阻塞的native方法nativePollOnce
,在MessageQueue#enqueueMessage 中如果需要唤醒会调用native方法nativeWake
。
问题是怎么阻塞的,怎么唤醒的,为什么要这样设计,直接在Java层完成处理不可以吗?
带着这样的困惑,开始Native层消息机制的分析学习。
2.1 MessageQueue Init流程
图片来自:Android消息机制2-Handler(Native层)
调用Native方法初始化,返回值为native层的NativeMessageQueue指针地址
//android.os.MessageQueue#MessageQueue
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
//调用Native方法初始化,返回值为native层的NativeMessageQueue指针地址
mPtr = nativeInit();
}
android_os_MessageQueue_nativeInit
//java native方法 nativeInit 的jni方法,返回类型long,即 mptr
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
//NativeMessageQueue是一个内部类
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
//返回为NativeMessageQueue指针地址
......
return reinterpret_cast<jlong>(nativeMessageQueue);
}
NativeMessageQueue构造方法
会进行Native层的Looper创建。Java层创建Looper然后再创建MessageQueue,而在Native层则刚刚相反,先创建NativeMessageQueue然后再创建Looper。
// core/jni/android_os_MessageQueue.cpp
//NativeMessageQueue构造方法
//Java层创建Looper然后再创建MessageQueue,
//而在Native层则刚刚相反,先创建NativeMessageQueue然后再创建Looper
//MessageQueue是在Java层与Native层有着紧密的联系,
//但是Native层的Looper与Java层的Looper没有任何关系
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
//Looper::getForThread()获取当前线程的Looper,相当于Java层的Looper.myLooper()
mLooper = Looper::getForThread();
if (mLooper == NULL) {
//如果为空, new一个native层的Looper
mLooper = new Looper(false);
//相当于java层的ThreadLocal.set() ?
Looper::setForThread(mLooper);
}
}
Natvie层的Looper的构造
MessageQueue是在Java层与Native层有着紧密的联系,但是Native层的Looper与Java层的Looper没有任何关系
// libutils/Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
//eventfd事件句柄,负责线程通信,替换了之前版本的pipe 构造唤醒事件fd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
//进行epoll句柄的创建和初始化
rebuildEpollLocked();
}
epoll句柄的创建、添加唤醒事件句柄到epoll
//libutils/Looper.cpp
void Looper::rebuildEpollLocked() {
......
//epoll_create1创建一个epoll句柄实例,并注册wake管道
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
......
//epoll事件结构体
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
//将唤醒事件句柄(request.fd),添加到epolled句柄(mEpollFd.get()),为epoll添加一个唤醒机制
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
......
}
}
Native层的init流程主要内容如下:
- NativeQueueMessage和Looper的初始化。
- 构建了epoll句柄,向epoll中添加epoll事件注册
2.2 消息读取流程
图片来自:Android消息机制2-Handler(Native层)
nativePollOnce
//core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//将Java层传过来的mPtr 转换为 nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用nativeMessageQueue的pollOnce
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
NativeMessageQueue :: pollOnce
//core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
......
//又调用了Natvie的Looper的pollOnce
mLooper->pollOnce(timeoutMillis);
......
}
Native层Looper::pollOnce
//libutils/Looper.cpp
/**
* timeoutMillis:超时时长
* outFd:发生事件的文件描述符
* outEvents:当前outFd上发生的事件,包含以下4类事件
EVENT_INPUT:可读
EVENT_OUTPUT:可写
EVENT_ERROR:错误
EVENT_HANGUP:中断
*outData:上下文数据
**/
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
......
//关键实现在pollInner中
result = pollInner(timeoutMillis);
}
}
Looper::pollInner
先会调用epoll_wait进入阻塞专题,唤醒的场景 想epoll中添加的epollevent等待事件发生或者超时触发nativeWake()方法,会向eventfd写入字符,进行唤醒。
然后进性要处理的事件收集,然后在做处理。Natvie的消息的处理顺序如下
- 处理Native的Message,调用Native的Handler来处理该Message
- 处理Resposne数组,POLL_CALLBACK类型的事件
//libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
......
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件发生或者超时触发nativeWake()方法,会向eventfd写入字符
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
//已经唤醒,如果是EPOLLIN类型事件,读取并清空eventfd中的数据
if (epollEvents & EPOLLIN) {
awoken();
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
}
}
}
//上面是收集,Done中是处理的部分,很多设计都是采用这种设计,逻辑分离
Done: ;
//1. 先处理Native的Message,调用Native的Handler来处理该Message
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
......
if (messageEnvelope.uptime <= now) {
{
......
handler->handleMessage(message);
}
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
//2. 再处理Resposne数组,POLL_CALLBACK类型的事件,比如一些
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
}
}
return result;
}
消息获取流程主要如下
- 依此调用NativeMessageQueue和looper中的pollonce,最终调用到Looper的pollInner
- pollInner会调用epoll_wait进入阻塞,唤醒的条件是epoll中添加的事件响应了或者发生了超时等。_
- 先执行Native层的Message,再执行Native层的Resposne数组,POLL_CALLBACK类型的事件(比如一些按键事件等)_
- 返回到Java层后再调用Java层MessageQueue中的读取Message。
2.3 消息发送流程
图片来自:Android消息机制2-Handler(Native层)
nativeWake
//core/jni/android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用NativeMessageQueue的wake
nativeMessageQueue->wake();
}
NativeMessageQueue::wake
//core/jni/android_os_MessageQueue.cpp
void NativeMessageQueue::wake() {
//调用Native层Looper的wake
mLooper->wake();
}
Looper::wake
libutils/Looper.cpp
void Looper::wake() {
//write方法 向mWakeEventfd中写一些内容,会把epoll_wait唤醒,线程就不阻塞了继续发送
//Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
......
}
消息发送流程如下
- Java层的enqueueMessage想java层的MessageQueue添加消息
- 如果需要唤醒调用native方法nativeWake
- 依次调用NativeMessageQueue和Looper的wake方法
- 最终调用writ方法想eventfd中写入一些内容进行epoll_wait的唤醒。
Android消息机制的分析到这里就结束了,最后来张消息处理家族类的关系图
图片来自:图书:《深入理解Android 卷3》
三、资料
- Android源码
- 图书:《深入理解Android 卷3》
- Android消息机制2-Handler(Native层)
- Android Handler机制10之Native的实现
- Android 中 MessageQueue 的 nativePollOnce
- IO多路复用之epoll总结
- select/poll/epoll对比分析
- 深入分析 ThreadLocal 内存泄漏问题
- 彻底搞清楚ThreadLocal与弱引用
四、收获
- 了解了消息机制在Native层处理流程
- 通过IO多路复用的epoll和eventfd进行事件的等待和唤醒实现
- 每次获取消息时,先执行Native层的Message,再执行Natvice层addfd的response,之后执行Java层的MessageQueue中消息
感谢你的阅读
Android消息机制就到这篇了,下篇我们进入Binder的分析学习,欢迎关注公众号“音视频开发之旅”,一起学习成长。
欢迎交流
以上是关于Android基础进阶 - 消息机制 之Native层分析的主要内容,如果未能解决你的问题,请参考以下文章