MessageQueue 实现详解(上)- Java 世界中的 Message
从 Android 2.3 开始,Java、native 层都可以把 Message 放到 Looper 线程处理,但是这两个世界的 Message 是完全不相关的。在本篇,我们先了解 Java 世界的 Message 是如何入队、出队的。
MessageQueue
的构造
在上一篇我们了解到,MessageQueue
实例是在 Looper
的构造函数中生成的:
1 | //frameworks/base/core/java/android/os/Looper.java |
再看看 MessageQueue
的构造函数:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
nativeInit()
是一个 native 方法,对应的实现是 android_os_MessageQueue_nativeInit
:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
在 native 层 new 了一个 NativeMessageQueue
后,直接返回了对象的指针。NativeMessageQueue
对象的指针存储在 mPtr
成员变量上。
我们继续看 NativeMessageQueue
:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
可以看到,NativeMessageQueue
的构造函数里,主要是生成了一个 Looper
C++ 实例(注意,这里的 Looper
是 C++ 里的 Looper
,不是我们常见的 Java 世界的 Looper
,为了避免混淆,我给他加上一个上标C++)。
LooperCallback
继承了 RefBase
,所以 nativeMessageQueue->incStrong(env)
调用的是 RefBase
的 incStrong()
。关于 RefBase
,不熟悉的读者可以看这个。
1 | //system/core/libutils/include/utils/Looper.h |
LooperC++ 的初始化
1 | //system/core/libutils/Looper.cpp |
eventfd
是 Linux 特有的一个系统调用,下面是它的原型和文档中的描述:
1 |
|
eventfd()
creates an “eventfd object” that can be used as an event wait/notify mechanism by user-space applications, and by the kernel to notify user-space applications of events. The object contains an unsigned 64-bit integer (uint64_t
) counter that is maintained by the kernel. This counter is initialized with the value specified in the argumentinitval
.
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
创建了一个 eventfd 对象。至于它的妙用,这里先卖个关子,后面我们再揭晓。
接下来是 Epoll 的初始化
1 | //system/core/libutils/Looper.cpp |
epoll_create, epoll_ctl
也是 Linux 特有的系统调用,不熟悉 Epoll 的同学,可以参考 Robert Love 的《Linux System Programming》。epoll_create
创建了一个 Epoll 实例,随后通过 epoll_ctl
把我们感兴趣的文件描述符添加进去。这里我们添加的是前面生成的 mWakeEventFd
。
mRequests
主要在C++世界使用,我们暂时不理会这部分。
到这里,MessageQueue
的初始化就完成了。前面我们说,Looper.prepare()
后,就可以调用 Looper.loop()
开始事件循环。而在 loop()
会,会不断从 MessageQueue
中取出 Message
:
1 | //frameworks/base/core/java/android/os/Looper.java |
下面我们就看看这个 messageQueue.next()
。
Message
的出队
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
可以看到,真正的实现在 NativeMessageQueue
中:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
把 env
和 pollObj
保存起来后,又调用了 mLooper
的 pollOnce
:
1 | //system/core/libutils/include/utils/Looper.h |
和前面一样,对于Java世界的人来说,我们暂时不关心 mResponses
。
下面看看 pollInner
(pollInner()
里许多 MessageQueue
没有使用到的代码都被我删除了)。
1 | //system/core/libutils/Looper.cpp |
前面我们初始化的 Epoll 实例,在这里就派上了用场。通过 Epoll,我们可以在实现一个带 timeout 的等待,并且随时可以被唤醒。
epoll_wait
的第四个参数 timeoutMillis
表示等待的时间:
- 如果大于 0,则最多等待
timeoutMillis
- 如果等于 0,则立即返回,即便没有任何事件发生(在
MessageQueue
的情况就是,即便mWakeEventFd
不可读,也会立即返回) - 如果小于 0,则无限等待
返回值 eventCount < 0
时表示出错,如果是收到信号(EINTR
)导致的出错返回,不算是错误。
接下来,如果 mWakeEventFd
可读,调用 awoken()
:
1 | //system/core/libutils/Looper.cpp |
和前面说的一样,被信号中断的错误(EINTR
)不算错误,可以重试。
awoken()
所做的,仅仅是消费掉这个“可读”事件。
pollInner
返回后,result
是 POLL_WAKE
或 POLL_TIMEOUT
(忽略出错时的 POLL_ERROR
)。如此一来,pollOnce
也会返回。
现在,我们继续从 MessageQueue.next()
往下走:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
第一次进来的时候,nextPollTimeoutMillis = 0
,在执行 nativePollOnce
的时候不会阻塞,这是因为此时也许已经有某个 Message
处于可执行状态。
接下来,从 mMessages
处取得队头元素(后面我们会看到,所有的 message
都放在以 mMessages
为对头的列表中)。在本篇,我们仅仅讨论 asynchronous = false
的情况,为 true 时仅 framework 内部使用,有兴趣的读者可以自行查看源码。
取得头结点后,有两种情况:
- 头结点的时间还没到,这个时候,把
nextPollTimeoutMillis
设置为剩余的时间 - 头结点的时间到了,返回对应的
msg
。
如果没有头结点,说明 messageQueue
为空。这种情况下,设置 nextPollTimeoutMillis = -1
后,在下一次循环中会无限等待(直到有 message
入队并唤醒线程)。
在 messageQueue
为空的情况下,并不会马上开始下一次循环,还有一些工作需要完成:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
在上一篇我们已经了解到,执行 messageQueue.quit()
后,会清空 mMessages
并设置 mQuitting = true
:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
quit()
清空 mMessages
后调用 nativeWake()
唤醒 Looper 线程。导致在上面的 next()
方法虽然从 nativePollOnce()
返回了,但 mMessage
为空。接着检查到 mQuitting
为 true
,便销毁 NativeMessageQueue
对象并返回 null
。最终,Looper.loop()
从无限循环中退出。
当然,如果 mQuitting
不为 true
,即便 mMessages
为空,也不表示正在退出。这种情况下,会继续在 nativePollOnce()
上等待。
关于 nativeWake()
的实现,我们在下一节讲解 message
的入队时再看,next()
函数到这里就结束了。
Message
的入队
我们知道,message
是通过 messageQueue.enqueueMessage()
入队的:
1 | //frameworks/base/core/java/android/os/MessageQueue.java |
Asynchronous message 相关的代码这里已经移除了。总体上,enqueueMessage
的实现还是比较清晰明了的。如果新入队的 message
可以执行了并且 mBlock
为 true
(true 表示 Looper 线程正在 Epoll_wait 上等待),就调用 nativeWake
唤醒 Looper 线程。
下面我们看看 nativeWake
:
1 | //frameworks/base/core/jni/android_os_MessageQueue.cpp |
实际上执行的是 Looper
C++ 的 wake
函数:
1 | //system/core/libutils/Looper.cpp |
通过往 mWakeEventFd
写入一个数,使得 mWakeEventFd
可读,于是,LooperJava 线程从 nativePollOnce()
醒来。
这里值得注意的是,在 MessageQueue.next()
中调用 nativePollOnce
时是不能持有锁的,否则,在 nextPollTimeoutMillis = -1
时,将会导致死锁。即使 nextPollTimeoutMillis
不为 -1,也会导致极大的性能下降。
此外,即便我们在进 nativePollOnce
准备等待消息前,刚好有某个 message
入队并且执行了 nativeWake()
,nativePollOnce()
也可以正常返回。epoll_wait
会因为 mWakeEventFd
已经有数据可读而立即返回。
题外话——使用 IO multiplexing 时的小技巧
举个具体的例子,比较说,某个 Web 服务器,使用单线程的网络 IO。在这个线程中用 poll
管理着所有的 socket。
当客户连接后,我们希望从客户端读取数据,于是,我们感兴趣的事件是 POLL_IN
。客户发生完数据后,poll
返回可读条件。服务器接着读取数据,然后继续在 poll
上等待。
服务器处理完请求后,便需要往客户端写入数据。但这个时候,执行 IO 的线程还阻塞在 poll
上。并且,由于客户在等待数据,POLL_IN
事件不会再发生。此时,我们需要怎么让 IO 线程醒过来呢(这样我们才能把响应交给它发送出去)?
解决的办法跟 MessageQueue
的实现一样。大部分情况下,为了移植性(还有维护性,比较 eventfd
还是不怎么常见),我们会创建一个 pipe
,并且让 poll
在 pipe 的读端等待。当我们需要唤醒 IO 线程时,就往管道里写一个数据,poll
就会因为管道可读而返回了。这里管道起的作用,跟 Looper
C++ 的 mWakeEventFd
是一样的。