深入理解MessageQueue
Posted 栀夏暖阳
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解MessageQueue相关的知识,希望对你有一定的参考价值。
android 中有两个非常重要的知识点,分别是Binder机制和Handler机制。前者用于跨进程通讯,并且通过 ServiceManager 给上层应用提供了大量的服务,而后者用于进程内部通讯,以消息队列的形式驱动应用的运行。之前的文章已经多次分析了Binder相关的内容,复杂程度远高于Handler,之后还会继续分析Binder。说到Handler,做安卓开发的一定都不会陌生,一般用于切换线程。其涉及到的类还有Looper,MessageQueue,Message 等。其中MessageQueue是事件驱动的基础,本文会重点分析MessageQueue,其他内容会简单带过,可以参考生产者-消费者模式。
从Handler的入口开始分析:
Looper.prepare();
1.创建一个Looper,并且是线程私有的:sThreadLocal.set(new Looper(quitAllowed));
2.初始化Handler:mHandler = new Handler();
,在构造函数中会获取线程私有的Looper,如获取不到会报错。
3.开启无限循环:Looper.loop();
。
在loop方法中主要代码如下:
for (;;)
Message msg = queue.next(); // might block
if (msg == null)
// No message indicates that the message queue is quitting.
return;
msg.target.dispatchMessage(msg);
msg.recycleUnchecked();
- 从MessageQueue中获取待处理的Message(阻塞线程)
- 交给与之关联的Handler处理
- 回收Message,供Message.obtain()复用
其中msg中的target是在Handler发送消息的时候赋值的:
public boolean sendMessageAtTime(Message msg, long uptimeMillis)
MessageQueue queue = mQueue;
if (queue == null)
RuntimeException e = new RuntimeException this + " sendMessageAtTime() called with no mQueue");
return false;
return enqueueMessage(queue, msg, uptimeMillis);
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis)
msg.target = this;
if (mAsynchronous)
msg.setAsynchronous(true);
return queue.enqueueMessage(msg, uptimeMillis);
发送的消息最终入队列到了MessageQueue。
简单总结一下Handler消息机制的工作原理:
- 创建与线程绑定的Looper,同时会创建一个与之关联的MessageQueue用于存放消息
- 开启消息循环,从MessageQueue中获取待处理消息,若无消息会阻塞线程
- 通过Handler发送消息,此时会将Message入队列到MessageQueue中,并且唤醒等待的Looper
- Looper获取的消息会投递给对应的Handler处理
可以看到其中与MessageQueue相关的也就两个操作,一个是入队列(MessageQueue是链表结构),一个是出队列,这正是本文介绍的重点。
MessageQueue的创建:
MessageQueue(boolean quitAllowed)
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
nativeInit()方法实现为android_os_MessageQueue_nativeInit():
[android_os_MessageQueue.cpp]
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz)
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue)
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
nativeMessageQueue->incStrong(env);
return reinterpret_cast(nativeMessageQueue);
这里会创建一个native层的MessageQueue,并且将引用地址返回给Java层保存在mPtr变量中,通过这种方式将Java层的对象与Native层的对象关联在了一起。这种在Java层保存Native层对象引用地址来实现关联的方式,在Android源代码中会经常看到。
然后看一下Native层MessageQueue的构造方法:
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL)
mLooper = Looper::getForThread();
if (mLooper == NULL)
mLooper = new Looper(false);
Looper::setForThread(mLooper);
也创建了一个Looper,并且也是与线程绑定的,事实上这个Looper与Java层的Looper并没有多大关系,一个是处理Native层时间的,一个是处理Java层事件的。
Java层的Looper会通过调用MessageQueue的next方法获取下一个消息,先看主要部分,后面省略了一部分IdleHandler的处理逻辑,用于空闲的时候处理不紧急事件用的,有兴趣的自行分析:
Message next()
final long ptr = mPtr;
if (ptr == 0)
return null;
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;)
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this)
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null)
// Stalled by a barrier. Find the next asynchronous message in the queue.
do
prevMsg = msg;
msg = msg.next;
while (msg != null && !msg.isAsynchronous());
if (msg != null)
if (now < msg.when)
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
else
// Got a message.
mBlocked = false;
if (prevMsg != null)
prevMsg.next = msg.next;
else
mMessages = msg.next;
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
else
// No more messages.
nextPollTimeoutMillis = -1;
// Process the quit message now that all pending messages have been handled.
if (mQuitting)
dispose();
return null;
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
这里有必要提一下MessageQueue的数据结构,是一个单向链表,Message对象有个next字段保存列表中的下一个,MessageQueue中的mMessages保存链表的第一个元素。
循环体内首先调用nativePollOnce(ptr, nextPollTimeoutMillis)
,这是一个native方法,实际作用就是通过Native层的MessageQueue阻塞nextPollTimeoutMillis毫秒的时间。
1.如果nextPollTimeoutMillis=-1,一直阻塞不会超时。
2.如果nextPollTimeoutMillis=0,不会阻塞,立即返回。
3.如果nextPollTimeoutMillis>0,最长阻塞nextPollTimeoutMillis毫秒(超时),如果期间有程序唤醒会立即返回。
暂时知道这些就可以继续向下分析了,native方法后面会讲到。
如果msg.target为null,则找出第一个异步消息,什么时候msg.target是null呢?看下面代码:
private int postSyncBarrier(long when)
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this)
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0)
while (p != null && p.when <= when)
prev = p;
p = p.next;
if (prev != null) // invariant: p == prev.next
msg.next = p;
prev.next = msg;
else
msg.next = p;
mMessages = msg;
return token;
这个方法直接在MessageQueue中插入了一个Message,并且未设置target。它的作用是插入一个消息屏障,这个屏障之后的所有同步消息都不会被执行,即使时间已经到了也不会执行。
可以通过public void removeSyncBarrier(int token)
来移除这个屏障,参数是post方法的返回值。
这些方法是隐藏的或者是私有的,具体应用场景可以查看ViewRootImpl中的void scheduleTraversals()
方法,它在绘图之前会插入一个消息屏障,绘制之后移除。
回到之前的next方法,如果发现了一个消息屏障,会循环找出第一个异步消息(如果有异步消息的话),所有同步消息都将忽略(平常发送的一般都是同步消息),可以通过setAsynchronous(boolean async)设置为异步消息。
继续往下,如果有消息需要处理,先判断时间有没有到,如果没到的话设置一下阻塞时间nextPollTimeoutMillis,进入下次循环的时候会调用nativePollOnce(ptr, nextPollTimeoutMillis);
阻塞;
否则把消息返回给调用者,并且设置mBlocked = false代表目前没有阻塞。
如果阻塞了有两种方式唤醒,一种是超时了,一种是被主动唤醒了。根据生产消费模式,生产者有产品的时候一般情况下会唤醒消费者。那么MessageQueue入队列的时候应该会去唤醒,下面看一下MessageQueue入队列的方法,截取了主要逻辑:
boolean enqueueMessage(Message msg, long when)
synchronized (this)
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when)
msg.next = p;
mMessages = msg;
needWake = mBlocked;
else
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;)
prev = p;
p = p.next;
if (p == null || when < p.when)
break;
if (needWake && p.isAsynchronous())
needWake = false;
msg.next = p; // invariant: p == prev.next
prev.next = msg;
if (needWake)
nativeWake(mPtr);
return true;
上面的代码主要就是加入链表的时候按时间顺序从小到大排序,然后判断是否需要唤醒,如果需要唤醒则调用nativeWake(mPtr);
来唤醒之前等待的线程。
再总结一下MessageQueue获取消息和加入消息的逻辑:
获取消息:
1.首次进入循环nextPollTimeoutMillis=0,阻塞方法nativePollOnce(ptr, nextPollTimeoutMillis)会立即返回
2.读取列表中的消息,如果发现消息屏障,则跳过后面的同步消息,总之会通过当前时间,是否遇到屏障来返回符合条件的待处理消息
3.如果没有符合条件的消息,会处理一些不紧急的任务(IdleHandler),再次进入第一步
加入消息:
1.加入消息比较简单,按时间顺序插入到消息链表中,如果是第一个那么根据mBlocked判断是否需要唤醒线程,如果不是第一个一般情况下不需要唤醒(如果加入的消息是异步的需要另外判断)
到这里其实关于MessageQueue已经分析的差不多了,其中有两个native方法没有涉及到分别是nativePollOnce,nativeWake,其实之前结论已经给出了,两个方法都会传入mPtr,在native层对应的是NativeMessageQueue的引用地址。
感兴趣的可以继续往下看,先看一下nativePollOnce的实现:
[android_os_MessageQueue.cpp]
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis)
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
通过传进来的ptr获取NativeMessageQueue对象的指针,然后调用NativeMessageQueue对象的pollOnce方法:
[android_os_MessageQueue.cpp]
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis)
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj)
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
调用的是Looper的pollOnce方法,这个Native层的Looper是在初始化NativeMessageQueue的时候创建的。
[Looper.cpp]
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)
int result = 0;
for (;;)
while (mResponseIndex < mResponses.size()) const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0)
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
if (result != 0)
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
result = pollInner(timeoutMillis);
先是处理native层的Response,这个直接跳过,最终调用pollInner
int Looper::pollInner(int timeoutMillis)
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX)
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis))
timeoutMillis = messageTimeoutMillis;
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired)
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
// Check for poll error.
if (eventCount < 0)
if (errno == EINTR)
goto Done;
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
// Check for poll timeout.
if (eventCount == 0)
result = POLL_TIMEOUT;
goto Done;
// Handle all events.
for (int i = 0; i < eventCount; i++) int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) if (epollEvents & EPOLLIN) awoken(); else ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); else ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0)
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
else
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0)
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now)
// obtain handler
sp handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
// release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
else
mNextMessageUptime = messageEnvelope.uptime;
break;
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) int fd = response.request.fd; int events = response.events; void* data = response.request.data; int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0)
removeFd(fd, response.request.seq);
response.request.callback.clear();
result = POLL_CALLBACK;
return result;
这个方法有点长,首先会根据Native Message的信息计算此次需要等待的时间,再调用
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
来等待事件发生,其中是epoll是Linux下多路复用IO接口select/poll的增强版本,具体可以自行查阅相关文章,查考:Linux IO模式及 select、poll、epoll详解
如果epoll_wait返回了,那么可能是出错返回,可能是超时返回,可能是有事件返回,如果是前两种情况跳转到Done处。
如果有事件发生,会判断事件是否是mWakeEventFd(唤醒的时候写入的文件)做不同处理。在Done处会处理Native事件,还有Response。
总结一下就是,Java层的阻塞是通过native层的epoll监听文件描述符的写入事件来实现的。
最后看一下nativeWake:
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr)
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
和之前一样,也是通过long类型的ptr获取NativeMessageQueue对象的指针,再调用wake方法:
void NativeMessageQueue::wake()
mLooper->wake();
调用的也是Looper的方法:
void Looper::wake()
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t))
if (errno != EAGAIN)
ALOGW("Could not write wake signal: %s", strerror(errno));
重点是write(mWakeEventFd, &inc, sizeof(uint64_t))
,写入了一个1,这个时候epoll就能监听到事件,也就被唤醒了
以上是关于深入理解MessageQueue的主要内容,如果未能解决你的问题,请参考以下文章
深入理解Android Handler机制(深入至native层)