Android6.0 消息機(jī)制原理解析
消息都是存放在一個(gè)消息隊(duì)列中去,而消息循環(huán)線程就是圍繞這個(gè)消息隊(duì)列進(jìn)入一個(gè)無(wú)限循環(huán)的,直到線程退出。如果隊(duì)列中有消息,消息循環(huán)線程就會(huì)把它取出來,并分發(fā)給相應(yīng)的Handler進(jìn)行處理;如果隊(duì)列中沒有消息,消息循環(huán)線程就會(huì)進(jìn)入空閑等待狀態(tài),等待下一個(gè)消息的到來。在編寫Android應(yīng)用程序時(shí),當(dāng)程序執(zhí)行的任務(wù)比較繁重時(shí),為了不阻塞UI主線程而導(dǎo)致ANR的發(fā)生,我們通常的做法的創(chuàng)建一個(gè)子線程來完成特定的任務(wù)。在創(chuàng)建子線程時(shí),有兩種選擇,一種通過創(chuàng)建Thread對(duì)象來創(chuàng)建一個(gè)無(wú)消息循環(huán)的子線程;還有一種就是創(chuàng)建一個(gè)帶有消息循環(huán)的子線程,而創(chuàng)建帶有消息循環(huán)的子線程由于兩種實(shí)現(xiàn)方法,一種是直接利用Android給我們封裝好的HandlerThread類來直接生成一個(gè)帶有消息循環(huán)的線程對(duì)象,另一種方法是在實(shí)現(xiàn)線程的run()方法內(nèi)使用以下方式啟動(dòng)一個(gè)消息循環(huán):
一、消息機(jī)制使用
通常消息都是有一個(gè)消息線程和一個(gè)Handler組成,下面我們看PowerManagerService中的一個(gè)消息Handler:
mHandlerThread = new ServiceThread(TAG, Process.THREAD_PRIORITY_DISPLAY, false /*allowIo*/); mHandlerThread.start(); mHandler = new PowerManagerHandler(mHandlerThread.getLooper());
這里的ServiceThread就是一個(gè)HandlerThread,創(chuàng)建Handler的時(shí)候,必須把HandlerThread的looper傳進(jìn)去,否則就是默認(rèn)當(dāng)前線程的looper。
而每個(gè)handler,大致如下:
private final class PowerManagerHandler extends Handler { public PowerManagerHandler(Looper looper) { super(looper, null, true /*async*/); } @Override public void handleMessage(Message msg) { switch (msg.what) { case MSG_USER_ACTIVITY_TIMEOUT: handleUserActivityTimeout(); break; case MSG_SANDMAN: handleSandman(); break; case MSG_SCREEN_BRIGHTNESS_BOOST_TIMEOUT: handleScreenBrightnessBoostTimeout(); break; case MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT: checkWakeLockAquireTooLong(); Message m = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); m.setAsynchronous(true); mHandler.sendMessageDelayed(m, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT); break; } } }
二、消息機(jī)制原理
那我們先來看下HandlerThread的主函數(shù)run函數(shù):
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper();//賦值后notifyall,主要是getLooper函數(shù)返回的是mLooper notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
再來看看Lopper的prepare函數(shù),最后新建了一個(gè)Looper對(duì)象,并且放在線程的局部變量中。
public static void prepare() { prepare(true); } private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
Looper的構(gòu)造函數(shù)中創(chuàng)建了MessageQueue
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
我們?cè)賮砜聪翸essageQueue的構(gòu)造函數(shù),其中nativeInit是一個(gè)native方法,并且把返回值保存在mPtr顯然是用long型變量保存的指針
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
native函數(shù)中主要?jiǎng)?chuàng)建了NativeMessageQueue對(duì)象,并且把指針變量返回了。
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast<jlong>(nativeMessageQueue); }
NativeMessageQueue構(gòu)造函數(shù)就是獲取mLooper,如果沒有就是新建一個(gè)Looper
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
然后我們?cè)倏聪翷ooper的構(gòu)造函數(shù),顯示調(diào)用了eventfd創(chuàng)建了一個(gè)fd,eventfd它的主要是用于進(jìn)程或者線程間的通信,我們可以看下這篇博客eventfd介紹
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno); AutoMutex _l(mLock); rebuildEpollLocked(); }
2.1 c層創(chuàng)建epoll
我們?cè)賮砜聪聄ebuildEpollLocked函數(shù),創(chuàng)建了epoll,并且把mWakeEventFd加入epoll,而且把mRequests的fd也加入epoll
void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d", errno); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d", request.fd, errno); } } }
繼續(xù)回到HandlerThread的run函數(shù),我們繼續(xù)分析Looper的loop函數(shù)
public void run() { mTid = Process.myTid(); Looper.prepare(); synchronized (this) { mLooper = Looper.myLooper(); notifyAll(); } Process.setThreadPriority(mPriority); onLooperPrepared(); Looper.loop(); mTid = -1; }
我們看看Looper的loop函數(shù):
public static void loop() { final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread."); } final MessageQueue queue = me.mQueue;//得到Looper的mQueue // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); for (;;) { Message msg = queue.next(); // might block這個(gè)函數(shù)會(huì)阻塞,阻塞主要是epoll_wait if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger Printer logging = me.mLogging;//自己打的打印 if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
MessageQueue類的next函數(shù)主要是調(diào)用了nativePollOnce函數(shù),后面就是從消息隊(duì)列中取出一個(gè)Message
Message next() { // Return here if the message loop has already quit and been disposed. // This can happen if the application tries to restart a looper after quit // which is not supported. final long ptr = mPtr;//之前保留的指針 if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } nativePollOnce(ptr, nextPollTimeoutMillis);
下面我們主要看下nativePollOnce這個(gè)native函數(shù),把之前的指針強(qiáng)制轉(zhuǎn)換成NativeMessageQueue,然后調(diào)用其pollOnce函數(shù)
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
2.2 c層epoll_wait阻塞
pollOnce函數(shù),這個(gè)函數(shù)前面的while一般都沒有只是處理了indent大于0的情況,這種情況一般沒有,所以我們可以直接看pollInner函數(shù)
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
pollInner函數(shù)主要就是調(diào)用epoll_wait阻塞,并且java層會(huì)計(jì)算每次阻塞的時(shí)間傳到c層,等待有mWakeEventFd或者之前addFd的fd有事件過來,才會(huì)epoll_wait返回。
int Looper::pollInner(int timeoutMillis) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis); #endif // Adjust the timeout based on when the next message is due. if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime); if (messageTimeoutMillis >= 0 && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) { timeoutMillis = messageTimeoutMillis; } #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d", this, mNextMessageUptime - now, timeoutMillis); #endif } // Poll. int result = POLL_WAKE; mResponses.clear();//清空mResponses mResponseIndex = 0; // We are about to idle. mPolling = true; struct epoll_event eventItems[EPOLL_MAX_EVENTS]; int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//epoll_wait主要線程阻塞在這,這個(gè)阻塞的時(shí)間也是有java層傳過來的 // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // Rebuild epoll set if needed. if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // Check for poll error. if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error, errno=%d", errno); result = POLL_ERROR; goto Done; } // Check for poll timeout. if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = POLL_TIMEOUT; goto Done; } // Handle all events. #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount); #endif for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) {//通知喚醒線程的事件 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { ssize_t requestIndex = mRequests.indexOfKey(fd);//之前addFd的事件 if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex));//放在mResponses中 } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; while (mMessageEnvelopes.size() != 0) {// 這塊主要是c層的消息,java層的消息是自己管理的 nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // Invoke all response callbacks. for (size_t i = 0; i < mResponses.size(); i++) {//這是之前addFd的事件的處理,主要是遍歷mResponses,然后調(diào)用其回調(diào) Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result; }
繼續(xù)分析Looper的loop函數(shù),可以增加自己的打印來調(diào)試代碼,之前調(diào)用Message的target的dispatchMessage來分配消息
for (;;) { Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger Printer logging = me.mLogging;//自己的打印 if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } msg.target.dispatchMessage(msg); if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn't corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } msg.recycleUnchecked(); } }
2.3 增加調(diào)試打印
我們先來看自己添加打印,可以通過Lopper的setMessageLogging函數(shù)來打印
public void setMessageLogging(@Nullable Printer printer) { mLogging = printer; } Printer就是一個(gè)interface public interface Printer { /** * Write a line of text to the output. There is no need to terminate * the given string with a newline. */ void println(String x); }
2.4 java層消息分發(fā)處理
再來看消息的分發(fā),先是調(diào)用Handler的obtainMessage函數(shù)
Message msg = mHandler.obtainMessage(MSG_CHECK_WAKE_LOCK_ACQUIRE_TIMEOUT); msg.setAsynchronous(true); mHandler.sendMessageDelayed(msg, WAKE_LOCK_ACQUIRE_TOO_LONG_TIMEOUT);
先看obtainMessage調(diào)用了Message的obtain函數(shù)
public final Message obtainMessage(int what) { return Message.obtain(this, what); }
Message的obtain函數(shù)就是新建一個(gè)Message,然后其target就是設(shè)置成其Handler
public static Message obtain(Handler h, int what) { Message m = obtain();//就是新建一個(gè)Message m.target = h; m.what = what; return m; }
我們?cè)俾?lián)系之前分發(fā)消息
msg.target.dispatchMessage(msg);最后就是調(diào)用Handler的dispatchMessage函數(shù),最后在Handler中,最后會(huì)根據(jù)不同的情況對(duì)消息進(jìn)行處理。
public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg);//這種就是用post形式發(fā)送,帶Runnable的 } else { if (mCallback != null) {//這種是handler傳參的時(shí)候就是傳入了mCallback回調(diào)了 if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg);//最后就是在自己實(shí)現(xiàn)的handleMessage處理 } }
2.3 java層 消息發(fā)送
我們?cè)倏聪耲ava層的消息發(fā)送,主要也是調(diào)用Handler的sendMessage post之類函數(shù),最終都會(huì)調(diào)用下面這個(gè)函數(shù)
public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); }
我們?cè)賮砜磈ava層發(fā)送消息最終都會(huì)調(diào)用enqueueMessage函數(shù)
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
最終在enqueueMessage中,把消息加入消息隊(duì)列,然后需要的話就調(diào)用c層的nativeWake函數(shù)
boolean enqueueMessage(Message msg, long when) { if (msg.target == null) { throw new IllegalArgumentException("Message must have a target."); } if (msg.isInUse()) { throw new IllegalStateException(msg + " This message is already in use."); } synchronized (this) { if (mQuitting) { IllegalStateException e = new IllegalStateException( msg.target + " sending message to a Handler on a dead thread"); Log.w(TAG, e.getMessage(), e); msg.recycle(); return false; } msg.markInUse(); msg.when = when; Message p = mMessages; boolean needWake; if (p == null || when == 0 || when < p.when) { // New head, wake up the event queue if blocked. msg.next = p; mMessages = msg; needWake = mBlocked; } else { // Inserted within the middle of the queue. Usually we don't have to wake // up the event queue unless there is a barrier at the head of the queue // and the message is the earliest asynchronous message in the queue. needWake = mBlocked && p.target == null && msg.isAsynchronous(); Message prev; for (;;) { prev = p; p = p.next; if (p == null || when < p.when) { break; } if (needWake && p.isAsynchronous()) { needWake = false; } } msg.next = p; // invariant: p == prev.next prev.next = msg; } // We can assume mPtr != 0 because mQuitting is false. if (needWake) { nativeWake(mPtr); } } return true; }
我們看下這個(gè)native方法,最后也是調(diào)用了Looper的wake函數(shù)
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->wake(); } void NativeMessageQueue::wake() { mLooper->wake(); }
Looper類的wake,函數(shù)只是往mWakeEventfd中寫了一些內(nèi)容,這個(gè)fd只是通知而已,類似pipe,最后會(huì)把epoll_wait喚醒,線程就不阻塞了繼續(xù)先發(fā)送c層消息,然后處理之前addFd的事件,然后處理java層的消息。
void Looper::wake() { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ wake", this); #endif uint64_t inc = 1; ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))); if (nWrite != sizeof(uint64_t)) { if (errno != EAGAIN) { ALOGW("Could not write wake signal, errno=%d", errno); } } }
2.4 c層發(fā)送消息
在c層也是可以發(fā)送消息的,主要是調(diào)用Looper的sendMessageAtTime函數(shù),參數(shù)有有一個(gè)handler是一個(gè)回調(diào),我們把消息放在mMessageEnvelopes中。
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) { #if DEBUG_CALLBACKS ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d", this, uptime, handler.get(), message.what); #endif size_t i = 0; { // acquire lock AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size(); while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) { i += 1; } MessageEnvelope messageEnvelope(uptime, handler, message); mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip // the call to wake() because the next thing the Looper will do after processing // messages is to decide when the next wakeup time should be. In fact, it does // not even matter whether this code is running on the Looper thread. if (mSendingMessage) { return; } } // release lock // Wake the poll loop only when we enqueue a new message at the head. if (i == 0) { wake(); } }
當(dāng)在pollOnce中,在epoll_wait之后,會(huì)遍歷mMessageEnvelopes中的消息,然后調(diào)用其handler的handleMessage函數(shù)
while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sp<MessageHandler> handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } }
有一個(gè)Looper_test.cpp文件,里面介紹了很多Looper的使用方法,我們來看下
sp<StubMessageHandler> handler = new StubMessageHandler(); mLooper->sendMessageAtTime(now + ms2ns(100), handler, Message(MSG_TEST1)); StubMessageHandler繼承MessageHandler就必須實(shí)現(xiàn)handleMessage方法 class StubMessageHandler : public MessageHandler { public: Vector<Message> messages; virtual void handleMessage(const Message& message) { messages.push(message); } };
我們?cè)夙槺憧聪翸essage和MessageHandler類
struct Message { Message() : what(0) { } Message(int what) : what(what) { } /* The message type. (interpretation is left up to the handler) */ int what; }; /** * Interface for a Looper message handler. * * The Looper holds a strong reference to the message handler whenever it has * a message to deliver to it. Make sure to call Looper::removeMessages * to remove any pending messages destined for the handler so that the handler * can be destroyed. */ class MessageHandler : public virtual RefBase { protected: virtual ~MessageHandler() { } public: /** * Handles a message. */ virtual void handleMessage(const Message& message) = 0; };
2.5 c層addFd
我們也可以在Looper.cpp的addFd中增加fd放入線程epoll中,當(dāng)fd有數(shù)據(jù)來我們也可以處理相應(yīng)的數(shù)據(jù),下面我們先來看下addFd函數(shù),我們注意其中有一個(gè)callBack回調(diào)
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); } int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) { #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident, events, callback.get(), data); #endif if (!callback.get()) { if (! mAllowNonCallbacks) { ALOGE("Invalid attempt to set NULL callback but not allowed for this looper."); return -1; } if (ident < 0) { ALOGE("Invalid attempt to set NULL callback with ident < 0."); return -1; } } else { ident = POLL_CALLBACK; } { // acquire lock AutoMutex _l(mLock); Request request; request.fd = fd; request.ident = ident; request.events = events; request.seq = mNextRequestSeq++; request.callback = callback; request.data = data; if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1 struct epoll_event eventItem; request.initEventItem(&eventItem); ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex < 0) { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);//加入epoll if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno); return -1; } mRequests.add(fd, request);//放入mRequests中 } else { int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);//更新 if (epollResult < 0) { if (errno == ENOENT) { // Tolerate ENOENT because it means that an older file descriptor was // closed before its callback was unregistered and meanwhile a new // file descriptor with the same number has been created and is now // being registered for the first time. This error may occur naturally // when a callback has the side-effect of closing the file descriptor // before returning and unregistering itself. Callback sequence number // checks further ensure that the race is benign. // // Unfortunately due to kernel limitations we need to rebuild the epoll // set from scratch because it may contain an old file handle that we are // now unable to remove since its file descriptor is no longer valid. // No such problem would have occurred if we were using the poll system // call instead, but that approach carries others disadvantages. #if DEBUG_CALLBACKS ALOGD("%p ~ addFd - EPOLL_CTL_MOD failed due to file descriptor " "being recycled, falling back on EPOLL_CTL_ADD, errno=%d", this, errno); #endif epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem); if (epollResult < 0) { ALOGE("Error modifying or adding epoll events for fd %d, errno=%d", fd, errno); return -1; } scheduleEpollRebuildLocked(); } else { ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno); return -1; } } mRequests.replaceValueAt(requestIndex, request); } } // release lock return 1; }
在pollOnce函數(shù)中,我們先尋找mRequests中匹配的fd,然后在pushResponse中新建一個(gè)Response,然后把Response和Request匹配起來。
} else { ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } }
下面我們就會(huì)遍歷mResponses中的Response,然后調(diào)用其request中的回調(diào)
for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } }
同樣我們?cè)賮砜纯碙ooper_test.cpp是如何使用的?
Pipe pipe; StubCallbackHandler handler(true); handler.setCallback(mLooper, pipe.receiveFd, Looper::EVENT_INPUT);
我們看下handler的setCallback函數(shù)
class CallbackHandler { public: void setCallback(const sp<Looper>& looper, int fd, int events) { looper->addFd(fd, 0, events, staticHandler, this);//就是調(diào)用了looper的addFd函數(shù),并且回調(diào) } protected: virtual ~CallbackHandler() { } virtual int handler(int fd, int events) = 0; private: static int staticHandler(int fd, int events, void* data) {//這個(gè)就是回調(diào)函數(shù) return static_cast<CallbackHandler*>(data)->handler(fd, events); } }; class StubCallbackHandler : public CallbackHandler { public: int nextResult; int callbackCount; int fd; int events; StubCallbackHandler(int nextResult) : nextResult(nextResult), callbackCount(0), fd(-1), events(-1) { } protected: virtual int handler(int fd, int events) {//這個(gè)是通過回調(diào)函數(shù)再調(diào)到這里的 callbackCount += 1; this->fd = fd; this->events = events; return nextResult; } };
我們結(jié)合Looper的addFd一起來看,當(dāng)callback是有的,我們新建一個(gè)SimpleLooperCallback
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) { return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data); }
這里的Looper_callbackFunc是一個(gè)typedef
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);
我們?cè)賮砜碨impleLooperCallback
class SimpleLooperCallback : public LooperCallback { protected: virtual ~SimpleLooperCallback(); public: SimpleLooperCallback(Looper_callbackFunc callback); virtual int handleEvent(int fd, int events, void* data); private: Looper_callbackFunc mCallback; };SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) : mCallback(callback) { } SimpleLooperCallback::~SimpleLooperCallback() { } int SimpleLooperCallback::handleEvent(int fd, int events, void* data) { return mCallback(fd, events, data); }
最后我們是調(diào)用callback->handleEvent(fd, events, data),而callback就是SimpleLooperCallback,這里的data,之前傳進(jìn)來的就是CallbackHandler 的this指針
因此最后就是調(diào)用了staticHandler,而data->handler,就是this->handler,最后是虛函數(shù)就調(diào)用到了StubCallbackHandler 的handler函數(shù)中了。
當(dāng)然我們也可以不用這么復(fù)雜,直接使用第二個(gè)addFd函數(shù),當(dāng)然callBack我們需要自己定義一個(gè)類來實(shí)現(xiàn)LooperCallBack類就行了,這樣就簡(jiǎn)單多了。
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
2.6 java層addFd
一直以為只能在c層的Looper中才能addFd,原來在java層也通過jni做了這個(gè)功能。
我們可以在MessageQueue中的addOnFileDescriptorEventListener來實(shí)現(xiàn)這個(gè)功能
public void addOnFileDescriptorEventListener(@NonNull FileDescriptor fd, @OnFileDescriptorEventListener.Events int events, @NonNull OnFileDescriptorEventListener listener) { if (fd == null) { throw new IllegalArgumentException("fd must not be null"); } if (listener == null) { throw new IllegalArgumentException("listener must not be null"); } synchronized (this) { updateOnFileDescriptorEventListenerLocked(fd, events, listener); } }
我們?cè)賮砜纯碠nFileDescriptorEventListener 這個(gè)回調(diào)
public interface OnFileDescriptorEventListener { public static final int EVENT_INPUT = 1 << 0; public static final int EVENT_OUTPUT = 1 << 1; public static final int EVENT_ERROR = 1 << 2; /** @hide */ @Retention(RetentionPolicy.SOURCE) @IntDef(flag=true, value={EVENT_INPUT, EVENT_OUTPUT, EVENT_ERROR}) public @interface Events {} @Events int onFileDescriptorEvents(@NonNull FileDescriptor fd, @Events int events); }
接著調(diào)用了updateOnFileDescriptorEventListenerLocked函數(shù)
private void updateOnFileDescriptorEventListenerLocked(FileDescriptor fd, int events, OnFileDescriptorEventListener listener) { final int fdNum = fd.getInt$(); int index = -1; FileDescriptorRecord record = null; if (mFileDescriptorRecords != null) { index = mFileDescriptorRecords.indexOfKey(fdNum); if (index >= 0) { record = mFileDescriptorRecords.valueAt(index); if (record != null && record.mEvents == events) { return; } } } if (events != 0) { events |= OnFileDescriptorEventListener.EVENT_ERROR; if (record == null) { if (mFileDescriptorRecords == null) { mFileDescriptorRecords = new SparseArray<FileDescriptorRecord>(); } record = new FileDescriptorRecord(fd, events, listener);//fd保存在FileDescriptorRecord對(duì)象 mFileDescriptorRecords.put(fdNum, record);//mFileDescriptorRecords然后保存在 } else { record.mListener = listener; record.mEvents = events; record.mSeq += 1; } nativeSetFileDescriptorEvents(mPtr, fdNum, events);//調(diào)用native函數(shù) } else if (record != null) { record.mEvents = 0; mFileDescriptorRecords.removeAt(index); } }
native最后調(diào)用了NativeMessageQueue的setFileDescriptorEvents函數(shù)
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr); nativeMessageQueue->setFileDescriptorEvents(fd, events); }
setFileDescriptorEvents函數(shù),這個(gè)addFd就是調(diào)用的第二個(gè)addFd,因此我們可以肯定NativeMessageQueue繼承了LooperCallback
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) { if (events) { int looperEvents = 0; if (events & CALLBACK_EVENT_INPUT) { looperEvents |= Looper::EVENT_INPUT; } if (events & CALLBACK_EVENT_OUTPUT) { looperEvents |= Looper::EVENT_OUTPUT; } mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, reinterpret_cast<void*>(events)); } else { mLooper->removeFd(fd); } }
果然是,需要實(shí)現(xiàn)handleEvent函數(shù)
class NativeMessageQueue : public MessageQueue, public LooperCallback { public: NativeMessageQueue(); virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis); void wake(); void setFileDescriptorEvents(int fd, int events); virtual int handleEvent(int fd, int events, void* data);
handleEvent就是在looper中epoll_wait之后,當(dāng)我們?cè)黾拥膄d有數(shù)據(jù)就會(huì)調(diào)用這個(gè)函數(shù)
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) { int events = 0; if (looperEvents & Looper::EVENT_INPUT) { events |= CALLBACK_EVENT_INPUT; } if (looperEvents & Looper::EVENT_OUTPUT) { events |= CALLBACK_EVENT_OUTPUT; } if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) { events |= CALLBACK_EVENT_ERROR; } int oldWatchedEvents = reinterpret_cast<intptr_t>(data); int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events); //調(diào)用回調(diào) if (!newWatchedEvents) { return 0; // unregister the fd } if (newWatchedEvents != oldWatchedEvents) { setFileDescriptorEvents(fd, newWatchedEvents); } return 1; }
最后在java的MessageQueue中的dispatchEvents就是在jni層反調(diào)過來的,然后調(diào)用之前注冊(cè)的回調(diào)函數(shù)
// Called from native code. private int dispatchEvents(int fd, int events) { // Get the file descriptor record and any state that might change. final FileDescriptorRecord record; final int oldWatchedEvents; final OnFileDescriptorEventListener listener; final int seq; synchronized (this) { record = mFileDescriptorRecords.get(fd);//通過fd得到FileDescriptorRecord if (record == null) { return 0; // spurious, no listener registered } oldWatchedEvents = record.mEvents; events &= oldWatchedEvents; // filter events based on current watched set if (events == 0) { return oldWatchedEvents; // spurious, watched events changed } listener = record.mListener; seq = record.mSeq; } // Invoke the listener outside of the lock. int newWatchedEvents = listener.onFileDescriptorEvents(//listener回調(diào) record.mDescriptor, events); if (newWatchedEvents != 0) { newWatchedEvents |= OnFileDescriptorEventListener.EVENT_ERROR; } // Update the file descriptor record if the listener changed the set of // events to watch and the listener itself hasn't been updated since. if (newWatchedEvents != oldWatchedEvents) { synchronized (this) { int index = mFileDescriptorRecords.indexOfKey(fd); if (index >= 0 && mFileDescriptorRecords.valueAt(index) == record && record.mSeq == seq) { record.mEvents = newWatchedEvents; if (newWatchedEvents == 0) { mFileDescriptorRecords.removeAt(index); } } } } // Return the new set of events to watch for native code to take care of. return newWatchedEvents; }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- android異步消息機(jī)制 源碼層面徹底解析(1)
- 代碼分析Android消息機(jī)制
- Android異步消息機(jī)制詳解
- android線程消息機(jī)制之Handler詳解
- android利用消息機(jī)制獲取網(wǎng)絡(luò)圖片
- Android 消息機(jī)制詳解及實(shí)例代碼
- Android的消息機(jī)制
- Android消息機(jī)制Handler的工作過程詳解
- 深入剖析Android消息機(jī)制原理
- Android 消息機(jī)制以及handler的內(nèi)存泄露
- Android 消息機(jī)制問題總結(jié)
- 深入淺析Android消息機(jī)制
- Android編程中的消息機(jī)制實(shí)例詳解
- Android編程之消息機(jī)制實(shí)例分析
- android異步消息機(jī)制 從源碼層面解析(2)
相關(guān)文章
Android實(shí)現(xiàn)在一個(gè)activity中添加多個(gè)listview的方法
這篇文章主要介紹了Android實(shí)現(xiàn)在一個(gè)activity中添加多個(gè)listview的方法,分析了Activity中添加listview的原理與具體實(shí)現(xiàn)方法,需要的朋友可以參考下2016-08-08Android開發(fā)仿掃一掃實(shí)現(xiàn)拍攝框內(nèi)的照片功能
無(wú)論是微信還是支付寶掃一掃功能很常用,那么它基于代碼是如何實(shí)現(xiàn)的呢?今天小編給大家分享android開發(fā)之仿掃一掃實(shí)現(xiàn)拍攝框內(nèi)的照片功能,感興趣的朋友一起學(xué)習(xí)吧2016-09-09android實(shí)現(xiàn)簡(jiǎn)單圓弧效果
這篇文章主要為大家詳細(xì)介紹了android實(shí)現(xiàn)簡(jiǎn)單圓弧效果,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-08-08Android編程實(shí)現(xiàn)夜間模式的方法小結(jié)
這篇文章主要介紹了Android編程實(shí)現(xiàn)夜間模式的方法,涉及Android亮度調(diào)節(jié)、自定義Theme、遮罩模式等相關(guān)使用技巧,需要的朋友可以參考下2018-01-01Android Studio實(shí)現(xiàn)單選對(duì)話框
這篇文章主要為大家詳細(xì)介紹了Android Studio實(shí)現(xiàn)單選對(duì)話框,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-05-05Android開發(fā)中的簡(jiǎn)單設(shè)置技巧集錦
這篇文章主要介紹了Android開發(fā)中的簡(jiǎn)單設(shè)置技巧,較為詳細(xì)的匯總了Android針對(duì)壁紙、語(yǔ)言、音量、錄音等多種設(shè)置的方法,非常簡(jiǎn)單實(shí)用,需要的朋友可以參考下2016-06-06基于Android 實(shí)現(xiàn)圖片平移、縮放、旋轉(zhuǎn)同時(shí)進(jìn)行
這篇文章主要介紹了基于Android 實(shí)現(xiàn)圖片平移、縮放、旋轉(zhuǎn)同時(shí)進(jìn)行的相關(guān)資料,需要的朋友可以參考下2015-11-11