一、什么是异步消息处理机制?

Android异步消息系统框架

Handler主要用于共享内存地址的两个线程通信,即同一进程的不同线程进行通信。

Handler主要角色: Handler、Message、Looper、MessageQueue

  • Message:消息分为硬件产生的消息(如按钮、触摸)和软件生成的消息;
  • MessageQueue:消息队列的主要功能向消息池投递消息(MessageQueue.enqueueMessage)和取走消息池的消息(MessageQueue.next);
  • Handler:消息辅助类,主要功能向消息池发送各种消息事件(Handler.sendMessage)和处理相应消息事件(Handler.handleMessage);
  • Looper:不断循环执行(Looper.loop),按分发机制将消息分发给目标处理者。

image-20241004215010790

为什么消息发出去又回来了呢? 同步与异步的区别会讲到

架构图

image-20241004215948664

同步与异步的区别

基本概念

同步的任务执行: 在执行程序时,如果没有收到执行结果,就一直等,继续往下执行,直到收到执行结果,才接着往下执行

异步的任务执行: 在执行程序时,如果遇到需要等待的任务,就另外开辟一个子线程去执行它,自己继续往下执行其他程序,子线程有结果时,会将结果发送给主线程

image-20241004213046139

实际例子

以扫本次分享会结束后的二维码问卷为例子,同步分为阻塞非阻塞异步就是非阻塞一种类型。

以下例子以我为主线程

同步阻塞: 我必须等到在座的各位都填完二维码才离开,才能做其他的事情。

同步非阻塞: 我不必须等到在座的各位填完二维码才离开,我可以去喝杯水再过来看看大家有没有填好,在我回来看之前我可以做其他的任何事情。只是轮询的去查看任务有没有完成。

异步非阻塞: 我讲完本次分享会的内容二维码摆在显示器上,合上电脑就走,做其他的事情,等大家填完了,手机的小米办公软件会收到各位填写完毕的通知。

总而言之就是; 同步阻塞就是任务没有完成,继续等待; 同步非阻塞任务没有完成,可以做其他事情,轮询看任务有没有完成;异步任务是否完成都能够做其他事情,子线程完成会通知任务是否完成。

二、为什么需要用异步消息处理机制?

UI线程安全

Android当中如果使用子线程更新UI会出现异常,导致出现崩溃。因为在 Android 中线程可以有好多个,如果每个线程都可以对 UI 进行访问(多线程并发访问),可能会导致 UI 控件处于不可预期的状态,这是线程不安全的。

什么时候子线程能够更新UI?

Android 每次刷新 UI 的时候,根布局 ViewRootImpl 会对 UI 操作做验证,这个验证是由 ViewRootImpl 中的 checkThread() 方法来完成:

1
2
3
4
5
6
7
8
9
public final class ViewRootImpl implements ViewParent, View.AttachInfo.Callbacks, 
ThreadedRenderer.DrawCallbacks {
void checkThread() {
if (mThread != Thread.currentThread()) {
throw new CalledFromWrongThreadException(
"Only the original thread that created a view hierarchy can touch its views.");
}
}
}

ViewRootImpl 的创建在 onResume 之后,对线程的异常检测处理在 ViewRootImpl 创建之后,也就是说如果在 onReusme 执行前其他线程访问 UI 的话是不会报错的。

ANR

主线程处理长时间耗时任务,那么app会出现无响应提示(超过5s会提示),也就是会报ANR异常


三、如何创建Handler?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class MainActivity extends Activity {
private Handler handler1;
private Handler handler2;
@Override
protected void onCreate(Bundle saveInstanceState) {
super.onCreate(saveInstanceState);
setContentView(R.layout.activity_main);
handler1 = new Handler();√
new Thread(new Runnable() {
@Override
public void run() {
handler2 = new Handler(); x
}
}).start();
}
}

在主线程中Looper和MessageQueue已经提前创建好了(Activity启动流程的ActivityThread#main()里面会创建),而子线程当中没有。

ActivityThread#main()

1
2
3
4
5
6
7
8
9
10
11
// 伪代码
public static void main(String[] strs) {
// 准备住Looper
Looper.prepareMainLooper();
// 创建ActivityThread实例
ActivityThread thread = new ActivityThread();
// 向AMS注册当前线程
thread.attach(false);
// 启动消息循环
Looper.loop();
}

无参构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Handler() {
this(null, false);
}

public Handler(Callback callback, boolean async) {
//匿名类、内部类或本地类都必须申明为static,否则会警告可能出现内存泄露
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
//必须先执行Looper.prepare(),才能获取Looper对象,否则为null.
mLooper = Looper.myLooper(); //从当前线程的TLS中获取Looper对象
if (mLooper == null) {
throw new RuntimeException("Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue; //消息队列,来自Looper对象
mCallback = callback; //回调方法
mAsynchronous = async; //设置消息是否为异步处理方式
}
Looper#myLooper()
1
2
3
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
Looper#prepare()
1
2
3
4
5
6
7
8
private static void prepare(boolean quitAllowed) {
//每个线程只允许执行一次该方法,第二次执行时线程的TLS已有数据,则会抛出异常。
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//创建Looper对象,并保存到当前线程的TLS区域
sThreadLocal.set(new Looper(quitAllowed));
}

Looper 是如何与 Thread 关联的?

Looper 与 Thread 之间是通过 ThreadLocal 关联的,在 Looper.prepare() 方法中,有一个 ThreadLocal 类型的 sThreadLocal 静态字段,Looper 通过它的 get 和 set 方法来赋值和取值。 由于 ThreadLocal 是与线程绑定的,所以只要把 Looper 与 ThreadLocal 绑定了,那 Looper 和 Thread 也就关联上了。

image-20241004235927460

ThreadLocal#get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
// 从map中获取当前ThreadLocal的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T) e.value; // 获取值
return result;
}
}
// 如果没有找到值,则调用initialValue()进行初始化
return setInitialValue(t);
}
ThreadLocal#set()
1
2
3
4
5
6
7
8
9
10
public void set(T value) {
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
map.put(this, value); // 存储值
} else {
createMap(t, value); // 创建新的ThreadLocalMap并存储值
}
}

有参构造

在子线程当中使用到

1
2
3
4
5
6
7
8
9
10
public Handler(Looper looper) {
this(looper, null, false);
}

public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}

四、如何发送消息?

1
2
3
4
5
6
7
8
9
10
11
new Thread(new Runnable() {
@Override
public void run() {
Message message = new Message();
message.arg1 = 1;
Bundle bundle = new Bundle();
bundle.putString("data", "value");
message.setData(bundle);
handler.sendMessage(message);
}
})

消息发送到哪去?

消息调用链

image-20241005135132571

sendEmptyMessage
1
2
3
public final boolean sendEmptyMessage(int what) {
return sendEmptyMessageDelayed(what, 0);
}
sendEmptyMessageDelayed
1
2
3
4
5
public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {
Message msg = Message.obtain();
msg.what = what;
return sendMessageDelayed(msg, delayMillis);
}
sendMessageDelayed
1
2
3
4
5
6
public final boolean sendMessageDelayed(Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
sendMessageAtTime
1
2
3
4
5
6
7
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
sendMessageAtFrontOfQueue
1
2
3
4
5
6
7
public final boolean sendMessageAtFrontOfQueue(Message msg) {
MessageQueue queue = mQueue;
if (queue == null) {
return false;
}
return enqueueMessage(queue, msg, 0);
}

该方法通过设置消息的触发时间为0,从而使Message加入到消息队列的队头。

enqueueMessage
1
2
3
4
5
6
7
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}

MessageQueue#enqueueMessage()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
boolean enqueueMessage(Message msg, long when) {
// 每一个普通Message必须有一个target
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}

// 检查消息是否正在使用
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}

synchronized (this) {
// 如果正在退出,回收消息并返回
if (mQuitting) {
msg.recycle();
return false;
}

// 标记消息为正在使用
msg.markInUse();
msg.when = when; // 设置消息的触发时间

Message p = mMessages; // 获取当前消息队列的头部
boolean needWake;

// 如果队列为空或新消息的触发时间早于当前队列中的第一个消息
if (p == null || when == 0 || when < p.when) {
// 新消息成为队头,唤醒事件队列(如果被阻塞)
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 将新消息按时间顺序插入到队列中
needWake = mBlocked && p.target == null && msg.isAsynchronous();

Message prev;
for (;;) {
prev = p;
p = p.next;

// 找到插入位置
if (p == null || when < p.when) {
prev.next = msg; // 插入新消息
msg.next = p; // 新消息指向后续消息
break;
}
}
}

// 如果需要唤醒事件队列,则通知它
if (needWake) {
nativeWake(mPtr);
}
}
return true; // 返回成功标志
}
  • 可以讲逻辑图插入进去

五、如何获取消息?

Looper#loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void loop() {
// 获取当前线程的 Looper 对象
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
// 获取消息队列
final MessageQueue queue = me.mQueue;

// 确保当前线程身份是本地进程的身份
Binder.clearCallingIdentity();

for (;;) { //进入loop的主循环方法
Message msg = queue.next(); //可能会阻塞
if (msg == null) { //没有消息,则退出循环
return;
}

//默认为null,可通过setMessageLogging()方法来指定输出,用于debug功能
Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
msg.target.dispatchMessage(msg); //用于分发Message
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}

//恢复调用者信息
final long newIdent = Binder.clearCallingIdentity();
msg.recycleUnchecked(); //将Message放入消息池
}
}

MessageQueue#next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
Message next() {
// 当消息循环已经退出并被丢弃,则直接返回
final long ptr = mPtr;
if (ptr == 0) {
return null; // 循环迭代的首次为-1
}

int pendingIdleHandlerCount = -1;
int nextPollTimeoutMillis = 0;

for (;;) {
// 如果有设置超时,调用 nativePollOnce(),阻塞直到有新消息到达
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
nativePollOnce(ptr, nextPollTimeoutMillis);
}

synchronized (this) {
// 尝试检索下一条消息,如果找到则返回
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;

if (msg != null && msg.target == null) {
// 被阻塞的 barrier,寻找下一个异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}

if (msg != null) {
if (now < msg.when) {
// 下一个消息还未准备好,设置超时
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 得到一条消息
mBlocked = false; // 取消阻塞状态
if (prevMsg != null) {
prevMsg.next = msg.next; // 从链表中移除消息
} else {
mMessages = msg.next; // 更新队列头
}
msg.next = null; // 清空当前消息的下一个指针
return msg; // 返回获取到的消息
}
} else {
// 队列中没有消息,设置超时为-1,表示一直等待
nextPollTimeoutMillis = -1;
}

//消息正在退出,返回null
if (mQuitting) {
dispose();
return null;
}
//当消息队列为空,或者是消息队列的第一个消息时
if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
//没有idle handlers 需要运行,则循环并等待。
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
//只有第一次循环时,会运行idle handlers,执行完成后,重置pendingIdleHandlerCount为0.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; //去掉handler的引用
boolean keep = false;
try {
keep = idler.queueIdle(); //idle时执行的方法
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
//重置idle handler个数为0,以保证不会再次重复运行
pendingIdleHandlerCount = 0;
//当调用一个空闲handler时,一个新message能够被分发,因此无需等待可以直接查询pending message.
nextPollTimeoutMillis = 0;
}
}

Handler#dispatchMessage

message.callback.run() > Handler.mCallback.handleMessage(msg) > Handler.handleMessage(msg)

这里解释了为什么,handleMessage()方法会收到消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
//当Message存在回调方法,回调msg.callback.run()方法;
handleCallback(msg);
} else {
if (mCallback != null) {
//当Handler存在Callback成员变量时,回调方法handleMessage();
if (mCallback.handleMessage(msg)) {
return;
}
}
//Handler自身的回调方法handleMessage()
handleMessage(msg);
}
}

Message#recycleUnchecked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//对于不再使用的消息,加入到消息池
void recycleUnchecked() {
//将消息标示位置为IN_USE,并清空消息所有的参数。
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) { //当消息池没有满时,将Message对象加入消息池
next = sPool;
sPool = this;
sPoolSize++; //消息池的可用大小进行加1操作
}
}
}

标准的异步消息处理线程的写法

1
2
3
4
5
6
7
8
9
10
11
12
class LooperThread extends Thread {
public Handler mHandler;
public void run() {
Looper.prepare();
mHander = new Hander() {
public void handleMessage(Message msg) {
// process incoming messages here
}
}
};
Looper.loop();
}

六、子线程间接操作UI的方式

Handler#post

1
2
3
4
5
6
7
8
public final boolean post(Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
private final Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}

View#post

Activity#runOnUiThread