[Android源码分析] - 异步通信Handler机制
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[Android源码分析] - 异步通信Handler机制相关的知识,希望对你有一定的参考价值。
参考技术A 一、问题:在android启动后会在新进程里创建一个主线程,也叫UI线程( 非线程安全 )这个线程主要负责监听屏幕点击事件与界面绘制。当Application需要进行耗时操作如网络请求等,如直接在主线程进行容易发生ANR错误。所以会创建子线程来执行耗时任务,当子线程执行完毕需要通知UI线程并修改界面时,不可以直接在子线程修改UI,怎么办?解决方法:Message Queue机制可以实现子线程与UI线程的通信。
该机制包括Handler、Message Queue、Looper。Handler可以把消息/ Runnable对象 发给Looper,由它把消息放入所属线程的消息队列中,然后Looper又会自动把消息队列里的消息/Runnable对象 广播 到所属线程里的Handler,由Handler处理接收到的消息或Runnable对象。
1、Handler
每次创建Handler对象时,它会自动绑定到创建它的线程上。如果是主线程则默认包含一个Message Queue,否则需要自己创建一个消息队列来存储。
Handler是多个线程通信的信使。比如在线程A中创建AHandler,给它绑定一个ALooper,同时创建属于A的消息队列AMessageQueue。然后在线程B中使用AHandler发送消息给ALooper,ALooper会把消息存入到AMessageQueue,然后再把AMessageQueue广播给A线程里的AHandler,它接收到消息会进行处理。从而实现通信。
2、Message Queue
在主线程里默认包含了一个消息队列不需要手动创建。在子线程里,使用Looper.prepare()方法后,会先检查子线程是否已有一个looper对象,如果有则无法创建,因为每个线程只能拥有一个消息队列。没有的话就为子线程创建一个消息队列。
Handler类包含Looper指针和MessageQueue指针,而Looper里包含实际MessageQueue与当前线程指针。
下面分别就UI线程和worker线程讲解handler创建过程:
首先,创建handler时,会自动检查当前线程是否包含looper对象,如果包含,则将handler内的消息队列指向looper内部的消息队列,否则,抛出异常请求执行looper.prepare()方法。
- 在 UI线程 中,系统自动创建了Looper 对象,所以,直接new一个handler即可使用该机制;
- 在 worker线程 中,如果直接创建handler会抛出运行时异常-即通过查‘线程-value’映射表发现当前线程无looper对象。所以需要先调用Looper.prepare()方法。在prepare方法里,利用ThreadLocal<Looper>对象为当前线程创建一个Looper(利用了一个Values类,即一个Map映射表,专为thread存储value,此处为当前thread存储一个looper对象)。然后继续创建handler, 让handler内部的消息队列指向该looper的消息队列(这个很重要,让handler指向looper里的消息队列,即二者共享同一个消息队列,然后handler向这个消息队列发送消息,looper从这个消息队列获取消息) 。然后looper循环消息队列即可。当获取到message消息,会找出message对象里的target,即原始发送handler,从而回调handler的handleMessage() 方法进行处理。
- handler与looper共享消息队列 ,所以handler发送消息只要入列,looper直接取消息即可。
- 线程与looper映射表 :一个线程最多可以映射一个looper对象。通过查表可知当前线程是否包含looper,如果已经包含则不再创建新looper。
5、基于这样的机制是怎样实现线程隔离的,即在线程中通信呢。
核心在于 每一个线程拥有自己的handler、message queue、looper体系 。而 每个线程的Handler是公开 的。B线程可以调用A线程的handler发送消息到A的共享消息队列去,然后A的looper会自动从共享消息队列取出消息进行处理。反之一样。
二、上面是基于子线程中利用主线程提供的Handler发送消息出去,然后主线程的Looper从消息队列中获取并处理。那么还有另外两种情况:
1、主线程发送消息到子线程中;
采用的方法和前面类似。要在子线程中实例化AHandler并设定处理消息的方法,同时由于子线程没有消息队列和Looper的轮询,所以要加上Looper.prepare(),Looper.loop()分别创建消息队列和开启轮询。然后在主线程中使用该AHandler去发送消息即可。
2、子线程A与子线程B之间的通信。
1、 Handler为什么能够实现不同线程的通信?核心点在哪?
不同线程之间,每个线程拥有自己的Handler、消息队列和Looper。Handler是公共的,线程可以通过使用目标线程的Handler对象来发送消息,这个消息会自动发送到所属线程的消息队列中去,线程自带的Looper对象会不断循环从里面取出消息并把消息发送给Handler,回调自身Handler的handlerMessage方法,从而实现了消息的线程间传递。
2、 Handler的核心是一种事件激活式(类似传递一个中断)的还是主要是用于传递大量数据的?重点在Message的内容,偏向于数据传输还是事件传输。
目前的理解,它所依赖的是消息队列,发送的自然是消息,即类似事件中断。
0、 Android消息处理机制(Handler、Looper、MessageQueue与Message)
1、 Handler、Looper源码阅读
2、 Android异步消息处理机制完全解析,带你从源码的角度彻底理解
谢谢!
wingjay
![](https://avatars0.githubusercontent.com/u/9619875?v=3&s=460)
你真的了解AsyncTask吗?AsyncTask源码分析
转载请注明出处:http://blog.csdn.net/yianemail/article/details/51611326
1,概述
Android UI是线程不安全的,如果想要在子线程很好的访问ui, 就要借助Android中的异步消息处理机制
http://blog.csdn.net/yianemail/article/details/50233373
通过Thread 执行耗时操作,通常利用Handler 发送消息给ui线程。
这种方式代码相对臃肿,并且不能对多任务执行很好的控制。
为了简化操作,Android 1.5提供了更加轻量级的AsyncTask异步工具类,使创建异步任务变得异常简单。
2,AsyncTask 的使用
看一个最典型的AsyncTask 简单使用场景
package com.listenread.luhuanju.helloword;
import android.os.AsyncTask;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//AsyncTask 开始执行异步任务
new MyAsyncTask().execute();
}
class MyAsyncTask extends AsyncTask<Void, Void, String> {
@Override
protected String doInBackground(Void... params) {
//Commen is request resurce from http and return to onPostExecute
//模拟数据返回
return "mStr";
}
@Override
protected void onPostExecute(String s) {
super.onPostExecute(s);
//获取数据
}
}
}
3,源码解析
既然代码入口
new MyAsyncTask().execute();
我们就从它的构造函数分析,
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
* 方法注释就说明了必须要在ui thread 进行AsyncTask调用
*/
public AsyncTask() {
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
来分析一下构造函数实例化的两个变量
mWorker 以及 mFuture
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
可以看到mWorker 是实现了参数是泛型的 Callable接口,看下Callable接口的声明
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
只有一个返回类型为泛型的 Call() 方法。
这里介绍一下Callable 这个接口,在java中,实现多线程的方法除了Runnable之外,还有Callable,Future 以及FutureTask(没错,就是一会要分析的这货),与Runnable不同的是,这几个类型都只能在线程池当中实现。Callable 与 Runnable 最关键的异点是Callable 可以有返回值。然而Runnable 是拿不到执行的返回值的。
这也就再一次解释了
public interface Callable<V> {
V call() throws Exception; //返回值泛型v
}
对比Runnable的接口声明
public interface Runnable {
public void run(); //无返回值
}
再来看一下mFuture 这货,可以看到把mWorker当作参数
public class FutureTask<V> implements RunnableFuture<V> {
可以看到FutureTask 实现了RunnableFuture< V>接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
而RunnableFuture 继承了Runnable 以及Future
终于到了Future了,继续看Future 的声明
public interface Future<V> {
/**
* ...忽略方法的注释声明
*/
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
看到这 我们大概明白了AsyncTask的一大优势所在,没错,就是可控。
总结一下FutureTask的作用,FutureTask作为一个可管理的异步任务,使得在线程池中执行的异步任务可以被更精准的控制。
分析完构造函数中 两个实例化的变量,在继续看execute()方法
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
execute()继续调用了executeOnExecutor(sDefaultExecutor, params) ,并且传递两个参数sDefaultExecutor以及params
我们看下sDefaultExecutor
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
所以我们只需要看下SerialExecutor 的实现即可;
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
SerialExecutor就是一个线程池, AsyncTask 的内部类,实现了Executor 接口
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
我们看下这句方法的注释就能明白execute()的作用就是在线程池中分配线程执行异步任务。那么execute()方法在AsyncTask 到底执行什么任务呢?没错,就是
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
我们发现了 mTasks.offer 这个方法,看下mTasks
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
简单说一下ArrayDeque
ArrayDeque是一个双端队列,它具有:
数组双端队列没有容量限制,使他们增长为必要支持使用。
不支持多线程并发访问。
它们要比堆栈Stack和LinkedList快。
看下它的offer()
/**
* Inserts the specified element at the end of this deque.
* 这句注释的意思就是说添加特定的元素(就是Runnable)到队尾
* <p>This method is equivalent to {@link #offerLast}.
*
* @param e the element to add
* @return <tt>true</tt> (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
return offerLast(e);
}
调用的 offerLast(e)
public boolean offerLast(E e) {
addLast(e);
return true;
}
继续addLast(e)
public void addLast(E e) {
if (e == null)
throw new NullPointerException("e == null");
elements[tail] = e;
if ( (tail = (tail + 1) & (elements.length - 1)) == head)
doubleCapacity();
}
/**
* Double the capacity of this deque. Call only when full, i.e.,
* when head and tail have wrapped around to become equal.
*/
private void doubleCapacity() {
// assert head == tail;
int p = head;
int n = elements.length;
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;
if (newCapacity < 0)
throw new IllegalStateException("Sorry, deque too big");
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
elements = a;
head = 0;
tail = n;
}
简单分析一下addLast(e) 的作用吧,如果传入参数为空,抛出异常
如果此时队列已满,就执行doubleCapacity();而这个方法就是把ArrayDeque扩容的,通过
...
int r = n - p; // number of elements to the right of p
int newCapacity = n << 1;//二进制,左移
...
Object[] a = new Object[newCapacity];
System.arraycopy(elements, p, a, 0, r);
System.arraycopy(elements, 0, a, r, p);
...
我们可知,满了就要扩容一倍的
说了这么多,我们重新回顾一下SerialExecutor
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
分析一下它的执行过程
第一次运行的时候,毫无疑问,局部变量mActive为null,执行scheduleNext()方法
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
又看到了 THREAD_POOL_EXECUTOR
public static final Executor THREAD_POOL_EXECUTOR
= new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
THREAD_POOL_EXECUTOR 也是一个线程池,通过构造函数初始化了
核心线程数,超时时长,所容纳的最大线程数,等一系类配置。在我的sdk版本好像对之前4.x做了优化,并没有固定写死,因为是这样的
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE = 1;
sDefaultExecutor 线程池内部维护一个任务队列(ArrayDeque),
execute()方法,将Runnable放入队尾,第一次执行 走scheduleNext(),此时 mTasks.poll() 取出队首任务,不为空则传入THREAD_POOL_EXECUTOR进行执行。
4,执行过程回顾
再来看一下 AsyncTask.execute()的执行过程
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING; //设置当前执行状态
onPreExecute(); //熟悉这个onPreExecute()方法吗?
mWorker.mParams = params;//外部参数赋值给mWorker局部变量
//最主要的逻辑就在这个方法了,提交mFuture执行,封装了worker,
exec.execute(mFuture);
return this;
}
mFuture 是需要mWorker 作为参数,而mWorker的执行
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
//提升线程的优先级
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
//看到了吗?这里的doInBackground 执行的就是你外部调用的 doInBackground()的逻辑。
Result result = doInBackground(mParams);
Binder.flushPendingCommands();
return postResult(result);
}
};
拿到结果调用postResult(result)
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
//getHandler()实际上就是 super(Looper.getMainLooper());,也就是Main 中的looper
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
AsyncTaskResult就是一个简单的携带参数的对象。
看到Messager 肯定会有Handler,于是找到
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
result.mTask.finish(result.mData[0]) 其实就是调用了AsyncTask类中的finish方法
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
调用了cancel()则执行onCancelled回调;正常执行的情况下调用我们的onPostExecute(result);最后将状态置为FINISHED。
再回到mFuture
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
执行 postResultIfNotInvoked(get());
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
根据 wasTaskInvoked状态判断是否执行postResult(result);
在mWorker 中我们已经知道 wasTaskInvoked.set(true)
所以该方法不会执行。
至此,AsyncTask源码分析完毕,相信大家对AsyncTask有了更深的理解~~~
以上是关于[Android源码分析] - 异步通信Handler机制的主要内容,如果未能解决你的问题,请参考以下文章
Android异步消息传递机制源码分析&&相关知识常被问的面试题
Android IntentService源码理解 及 HandlerThread构建消息循环机制分析