MessageQueue 实现详解(上)- Java 世界中的 Message
从 Android 2.3 开始,Java、native 层都可以把 Message 放到 Looper 线程处理,但是这两个世界的 Message 是完全不相关的。在本篇,我们先了解 Java 世界的 Message 是如何入队、出队的。
MessageQueue 的构造
在上一篇我们了解到,MessageQueue 实例是在 Looper 的构造函数中生成的:
1 | //frameworks/base/core/java/android/os/Looper.java |
再看看 MessageQueue 的构造函数:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
nativeInit() 是一个 native 方法,对应的实现是 android_os_MessageQueue_nativeInit:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
在 native 层 new 了一个 NativeMessageQueue 后,直接返回了对象的指针。NativeMessageQueue 对象的指针存储在 mPtr 成员变量上。
我们继续看 NativeMessageQueue:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
可以看到,NativeMessageQueue 的构造函数里,主要是生成了一个 LooperC++ 实例(注意,这里的 Looper 是 C++ 里的 Looper,不是我们常见的 Java 世界的 Looper,为了避免混淆,我给他加上一个上标C++)。
LooperCallback 继承了 RefBase,所以 nativeMessageQueue->incStrong(env) 调用的是 RefBase 的 incStrong()。关于 RefBase,不熟悉的读者可以看这个。
1 | //system/core/libutils/include/utils/Looper.h |
LooperC++ 的初始化
1 | //system/core/libutils/Looper.cpp |
eventfd 是 Linux 特有的一个系统调用,下面是它的原型和文档中的描述:
1 |
|
eventfd()creates an “eventfd object” that can be used as an event wait/notify mechanism by user-space applications, and by the kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t) counter that is maintained by the kernel. This counter is initialized with the value specified in the argumentinitval.
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); 创建了一个 eventfd 对象。至于它的妙用,这里先卖个关子,后面我们再揭晓。
接下来是 Epoll 的初始化
1 | //system/core/libutils/Looper.cpp |
epoll_create, epoll_ctl 也是 Linux 特有的系统调用,不熟悉 Epoll 的同学,可以参考 Robert Love 的《Linux System Programming》。epoll_create 创建了一个 Epoll 实例,随后通过 epoll_ctl 把我们感兴趣的文件描述符添加进去。这里我们添加的是前面生成的 mWakeEventFd。
mRequests 主要在C++世界使用,我们暂时不理会这部分。
到这里,MessageQueue 的初始化就完成了。前面我们说,Looper.prepare() 后,就可以调用 Looper.loop() 开始事件循环。而在 loop() 会,会不断从 MessageQueue 中取出 Message:
1 | //frameworks/base/core/java/android/os/Looper.java |
下面我们就看看这个 messageQueue.next()。
Message 的出队
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
可以看到,真正的实现在 NativeMessageQueue 中:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
把 env 和 pollObj 保存起来后,又调用了 mLooper 的 pollOnce:
1 | //system/core/libutils/include/utils/Looper.h |
和前面一样,对于Java世界的人来说,我们暂时不关心 mResponses。
下面看看 pollInner(pollInner() 里许多 MessageQueue 没有使用到的代码都被我删除了)。
1 | //system/core/libutils/Looper.cpp |
前面我们初始化的 Epoll 实例,在这里就派上了用场。通过 Epoll,我们可以在实现一个带 timeout 的等待,并且随时可以被唤醒。
epoll_wait 的第四个参数 timeoutMillis 表示等待的时间:
- 如果大于 0,则最多等待
timeoutMillis - 如果等于 0,则立即返回,即便没有任何事件发生(在
MessageQueue的情况就是,即便mWakeEventFd不可读,也会立即返回) - 如果小于 0,则无限等待
返回值 eventCount < 0 时表示出错,如果是收到信号(EINTR)导致的出错返回,不算是错误。
接下来,如果 mWakeEventFd 可读,调用 awoken():
1 | //system/core/libutils/Looper.cpp |
和前面说的一样,被信号中断的错误(EINTR)不算错误,可以重试。
awoken() 所做的,仅仅是消费掉这个“可读”事件。
pollInner 返回后,result 是 POLL_WAKE 或 POLL_TIMEOUT(忽略出错时的 POLL_ERROR)。如此一来,pollOnce 也会返回。
现在,我们继续从 MessageQueue.next() 往下走:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
第一次进来的时候,nextPollTimeoutMillis = 0,在执行 nativePollOnce 的时候不会阻塞,这是因为此时也许已经有某个 Message 处于可执行状态。
接下来,从 mMessages 处取得队头元素(后面我们会看到,所有的 message 都放在以 mMessages 为对头的列表中)。在本篇,我们仅仅讨论 asynchronous = false 的情况,为 true 时仅 framework 内部使用,有兴趣的读者可以自行查看源码。
取得头结点后,有两种情况:
- 头结点的时间还没到,这个时候,把
nextPollTimeoutMillis设置为剩余的时间 - 头结点的时间到了,返回对应的
msg。
如果没有头结点,说明 messageQueue 为空。这种情况下,设置 nextPollTimeoutMillis = -1 后,在下一次循环中会无限等待(直到有 message 入队并唤醒线程)。
在 messageQueue 为空的情况下,并不会马上开始下一次循环,还有一些工作需要完成:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
在上一篇我们已经了解到,执行 messageQueue.quit() 后,会清空 mMessages 并设置 mQuitting = true:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
quit() 清空 mMessages 后调用 nativeWake() 唤醒 Looper 线程。导致在上面的 next() 方法虽然从 nativePollOnce() 返回了,但 mMessage 为空。接着检查到 mQuitting 为 true,便销毁 NativeMessageQueue 对象并返回 null。最终,Looper.loop() 从无限循环中退出。
当然,如果 mQuitting 不为 true,即便 mMessages 为空,也不表示正在退出。这种情况下,会继续在 nativePollOnce() 上等待。
关于 nativeWake() 的实现,我们在下一节讲解 message 的入队时再看,next() 函数到这里就结束了。
Message 的入队
我们知道,message 是通过 messageQueue.enqueueMessage() 入队的:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
Asynchronous message 相关的代码这里已经移除了。总体上,enqueueMessage 的实现还是比较清晰明了的。如果新入队的 message 可以执行了并且 mBlock 为 true(true 表示 Looper 线程正在 Epoll_wait 上等待),就调用 nativeWake 唤醒 Looper 线程。
下面我们看看 nativeWake:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
实际上执行的是 LooperC++ 的 wake 函数:
1 | //system/core/libutils/Looper.cpp |
通过往 mWakeEventFd 写入一个数,使得 mWakeEventFd 可读,于是,LooperJava 线程从 nativePollOnce() 醒来。
这里值得注意的是,在 MessageQueue.next() 中调用 nativePollOnce 时是不能持有锁的,否则,在 nextPollTimeoutMillis = -1 时,将会导致死锁。即使 nextPollTimeoutMillis 不为 -1,也会导致极大的性能下降。
此外,即便我们在进 nativePollOnce 准备等待消息前,刚好有某个 message 入队并且执行了 nativeWake(),nativePollOnce() 也可以正常返回。epoll_wait 会因为 mWakeEventFd 已经有数据可读而立即返回。
题外话——使用 IO multiplexing 时的小技巧
举个具体的例子,比较说,某个 Web 服务器,使用单线程的网络 IO。在这个线程中用 poll 管理着所有的 socket。
当客户连接后,我们希望从客户端读取数据,于是,我们感兴趣的事件是 POLL_IN。客户发生完数据后,poll 返回可读条件。服务器接着读取数据,然后继续在 poll 上等待。
服务器处理完请求后,便需要往客户端写入数据。但这个时候,执行 IO 的线程还阻塞在 poll 上。并且,由于客户在等待数据,POLL_IN 事件不会再发生。此时,我们需要怎么让 IO 线程醒过来呢(这样我们才能把响应交给它发送出去)?
解决的办法跟 MessageQueue 的实现一样。大部分情况下,为了移植性(还有维护性,比较 eventfd 还是不怎么常见),我们会创建一个 pipe,并且让 poll 在 pipe 的读端等待。当我们需要唤醒 IO 线程时,就往管道里写一个数据,poll 就会因为管道可读而返回了。这里管道起的作用,跟 LooperC++ 的 mWakeEventFd 是一样的。