欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Native層消息機(jī)制深入探究實例解析

 更新時間:2023年01月17日 11:23:27   作者:大胃粥  
這篇文章主要為大家介紹了Native層消息機(jī)制的深入探究及實例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

引言

在分析底層源碼時,時不時會碰到 Looper::wake() 或者 Looper::pollOnce() 這樣的代碼,之前大概知道是 Native 層的消息循環(huán)機(jī)制。為了以后我也能夠使用它,我決定還是徹底分析一遍源碼。

本文只涉及一個文件,路徑如下

system/core/libutils/Looper.cpp

Looper的創(chuàng)建

在 Java 層,有一個線程的子類 HandlerThread,它可以創(chuàng)建一個線程,并且使用消息機(jī)制,其中的關(guān)鍵兩步是 Looper::prepare() 和 Looper::loop()。

Looper::prepare() 會創(chuàng)建一個與線程綁定的 Looper 對象,這個綁定是通過 ThreadLocal實現(xiàn)的,而 Looper::loop() 會讓線程進(jìn)入無限循環(huán)來處理消息。

在 Native 層,也有一個 Looper 類,可以通過 Looper::prepare() 來創(chuàng)建 Looper 對象,代碼如下

sp<Looper> Looper::prepare(int opts) {
    // opts決定Looper::addFd()的參數(shù)callback是否可以為空
    bool allowNonCallbacks = opts & PREPARE_ALLOW_NON_CALLBACKS;
    // 通過pthread_getspecific()獲取線程本地存儲中的Looper對象
    sp<Looper> looper = Looper::getForThread();
    if (looper == nullptr) {
        looper = new Looper(allowNonCallbacks);
        // 通過pthread_setspecific()把Looper對象保存到線程本地存儲中
        Looper::setForThread(looper);
    }
    return looper;
}

Native 層的線程在首次調(diào)用 Looper::prepare() 時,會創(chuàng)建 Looper 對象,并通過 pthread_setspecific() 把它保存到線程本地存儲中,后面再獲取 Looper 對象時,通過 pthread_getspecific() 從線程本地存儲中獲取。

這不就是 Java 的 ThreadLocal 的功能嗎?

現(xiàn)在讓我們來看下 Looper 對象的創(chuàng)建過程

Looper::Looper(bool allowNonCallbacks)
    : mAllowNonCallbacks(allowNonCallbacks),
      mSendingMessage(false),
      mPolling(false),
      mEpollRebuildRequired(false),
      mNextRequestSeq(0),
      mResponseIndex(0),
      mNextMessageUptime(LLONG_MAX) {
    // 1. 創(chuàng)建 eventfd 對象,用于喚醒 Looper
    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
    AutoMutex _l(mLock);
    // 2. 創(chuàng)建 epoll 對象,并監(jiān)聽剛才創(chuàng)建的 eventfd 的輸入事件
    rebuildEpollLocked();
}

首先創(chuàng)建了一個eventfd 對象,由 mWakeEventFd 代表,它用于喚醒 Looper 。如何喚醒呢? 繼續(xù)往下看。

然后調(diào)用 rebuildEpollLocked() 創(chuàng)建一個 epoll 對象,并監(jiān)聽剛才創(chuàng)建的 mWakeEventFd 的 I/O 事件,代碼如下

void Looper::rebuildEpollLocked() {
    // ...
    // 創(chuàng)建epoll對象
    mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
    // 監(jiān)聽 mWakeEventFd 輸入事件
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd.get();
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
    for (size_t i = 0; i < mRequests.size(); i++) {
        // ...
    }
}

epoll 對象監(jiān)聽了 mWakeEventFd 的可讀事件。在后面的分析中,我們將看到,當(dāng) Looper 開始輪詢時,會調(diào)用 epoll_wait() 阻塞地等待事件,那么有人向 mWakeEventFd 寫入數(shù)據(jù)時,epoll_wait() 將返回,那么 Looper 就被喚醒。

發(fā)送消息與監(jiān)聽請求

Native 層的 Looper 可以處理兩種類型的事件,一種是消息( Message ),另一種是請求( Request )。下面我們來看看如何向 Looper 發(fā)送消息,如何讓 Looper 監(jiān)聽請求。

發(fā)送消息

通過 Looper::sendMessageXXX() 這一類函數(shù),可以向 Looper 發(fā)送消息

void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
        const Message& message) {
    size_t i = 0;
    { // acquire lock
        // 通過鎖,可以保存 mMessageEnvelopes 的線程安全
        AutoMutex _l(mLock);
        // 獲取消息要插入的位置
        size_t messageCount = mMessageEnvelopes.size();
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }
        // 1. 把信息保存到 mMessageEnvelopes 中
        MessageEnvelope messageEnvelope(uptime, handler, message);
        mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
        // mSendingMessage 表明 Looper 正在處理消息,因此不用喚醒Looper
        if (mSendingMessage) {
            return;
        }
    } // release lock
    // 2. 如有必要,就喚醒Looper
    if (i == 0) {
        wake();
    }
}

當(dāng) Looper 接收到消息時,它會把消息保存到 mMessageEnvelopes 容器中,并且如果有必要,那么會調(diào)用 Looper::wake() 喚醒 Looper 來處理消息。

前面我們大概地說明了下如何通過 mWakeEventFd 這個 eventfd 對象喚醒 Looper,現(xiàn)在讓我們來看下喚醒是如何實現(xiàn)的

void Looper::wake() {
    uint64_t inc = 1;
    // 向 mWakeFd 中寫入數(shù)據(jù)
    // TEMP_FAILURE_RETRY 是一個重試機(jī)制,確保不會因為系統(tǒng)中斷而導(dǎo)致數(shù)據(jù)沒有寫入
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
    // 確保寫入的是 unsigned 64-bit 的數(shù)據(jù)
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                             mWakeEventFd.get(), nWrite, strerror(errno));
        }
    }
}

原來就是向 mWakeEventFd 寫數(shù)據(jù)。在后面我們將看到,當(dāng) Looper 進(jìn)入輪詢時, 當(dāng)epoll_wait() 檢測到 mWakeEventFd 有數(shù)據(jù)可讀時,就會從阻塞中醒來,從而達(dá)到 mWakeEventFd 喚醒 Looper 的目的。

監(jiān)聽請求

現(xiàn)在我們來看下如何讓 Looper 監(jiān)聽請求,它是通過 Looper::addFd() 實現(xiàn)的

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    // 1. 確認(rèn) ident 的值
    if (!callback.get()) { // 回調(diào)為空
        if (! mAllowNonCallbacks) { // mAllowNonCallbacks是在創(chuàng)建Looper時初始化的
            ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
            return -1;
        }
        if (ident < 0) { // 回調(diào)為空,ident的值不能小于0
            ALOGE("Invalid attempt to set NULL callback with ident < 0.");
            return -1;
        }
    } else { // 回調(diào)不為空
        // POLL_CALLBACK值為-2,表示請求通過回調(diào)處理請求
        ident = POLL_CALLBACK;
    }
    { // acquire lock
        AutoMutex _l(mLock);
        // 2. 包裝成一個 Request
        Request request;
        request.fd = fd;
        request.ident = ident;
        request.events = events;
        request.seq = mNextRequestSeq++;
        request.callback = callback;
        request.data = data;
        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);
        // 3. 用 epoll 對象監(jiān)聽 fd 的事件,并把請求保存到 mRequests 中
        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
            if (epollResult < 0) {
                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                return -1;
            }
            mRequests.add(fd, request);
        } else {
            int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
            if (epollResult < 0) {
                // ...
            }
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
    return 1;
}

Looper::addFd() 的實質(zhì)就是用 epoll 對象監(jiān)聽指定 fd 的 I/O 事件。為何我把這一過程稱之為 監(jiān)聽請求 呢 ? 因為這個函數(shù)把它的參數(shù)包裝成一個 Request 對象,并保存到 mRequests 容器 中。

Looper 進(jìn)行輪詢時,epoll_wait() 會阻塞,當(dāng) fd 指向的文件有 I/O 事件時,epoll_wait() 將會獲取到 fd 的可讀事件,因此 Looper 會被喚醒。

當(dāng) Looper 檢測到有請求到來時,一般是通過回調(diào)處理的,也就是這里的參數(shù) callback。當(dāng)然,也可以不設(shè)置回調(diào),當(dāng)有請求到來時,交給外部的調(diào)用者去處理,我們將會在后面看到。

根據(jù)以上知識,我們來理解 Looper::addFd() 的第一步。當(dāng) callback 參數(shù)為空時,ident 必須大于或等于0,否則為 POLL_CALLBACK,注意,這的值是 -2。 因此呢,當(dāng)我們檢測到一個請求的 ident 大于或等于0時,這個請求肯定不是通過回調(diào)處理的。這一點非常重要,我們將會在后面用到。

Looper 處理消息或請求

Native 層的 Looper 是通過 Looper::pollOnce() 或 Looper::pollAll() 來統(tǒng)一處理消息和請求的,我們挑 Looper::pollOnce() 這個函數(shù)來分析下

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) { // 無限循環(huán)
        // 1. 處理那些不是通過callback處理的請求
        while (mResponseIndex < mResponses.size()) {
            // ...
        }
        // 2. 處理pollInner()輪詢的結(jié)果
        if (result != 0) {
            // ...
        }
        // 3. epoll 等待并處理事件(如果有事件到來)
        result = pollInner(timeoutMillis);
    }
}

當(dāng)首次調(diào)用 Looper::pollOnce() 時,第一步和第二步肯定不會發(fā)生,那么我們先來看下第三步,這個函數(shù)比較長,我們一步步解析

int Looper::pollInner(int timeoutMillis) {
    // 省略計算 timeoutMillis 的代碼
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    mPolling = true;
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    // 1. 等待事件
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    // ...
}

第一步,通過 epoll_wait() 阻塞地等待它監(jiān)聽的 fd 的 I/O 就緒, 此時 Looper 進(jìn)入休眠。

那么怎么喚醒 Looper 呢? 根據(jù)前面的分析,epoll 對象監(jiān)聽了 mWakeEventFd 以及 通過 Looper::addFd() 添加的 fd。 那么向這些被監(jiān)聽的 fd 寫入數(shù)據(jù),就可以喚醒 Looper。例如,Looper::wake() 就是通過向 mWakeEventFd 寫入數(shù)據(jù)來喚醒 Looper。

那么現(xiàn)在我們來看下 Looper 被喚醒后的處理流程

int Looper::pollInner(int timeoutMillis) {
    // ...
    // 1. 等待I/O就緒
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    // ...
    // 2. 處理 epoll 事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) { 
            // 2.1 處理被消息喚醒的情況
            if (epollEvents & EPOLLIN) {
                // 清理eventfd中的數(shù)據(jù)
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else { 
            // 2.2 處理被請求喚醒的情況
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                // 根據(jù)epoll觸發(fā)的事件類型,填充events相應(yīng)的位
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                // 創(chuàng)建Response對象,保存兩個參數(shù),然后把Response對象保存到mResponses中
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }    
}

當(dāng) mWakeEventFd 的 I/O 就緒,就會走到2.1步,之后會讀取 mWakeEventFd 中的數(shù)據(jù),讀取的數(shù)據(jù)并沒有什么用,只是清理數(shù)據(jù)而已。而這一步,大部分情況 是由于消息的到來,而極少情況是并不是因為消息的到來,而是因為線程有緊急事情需要處理,所以必須要喚醒。

當(dāng)通過Looper::addFd() 添加的 fd 就緒時,就會走到 2.2 步,這一步一定是因為請求到來了。它會創(chuàng)建 Reponse 對象,并保存,代碼如下

void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}

這里我們要注意下,mResponses 容器表示待處理請求的集合,這些請求會在后面處理,讓我們接著往下看。

int Looper::pollInner(int timeoutMillis) {
    // ...
    // 1. 等待I/O就緒
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    // ...
    // 2. 處理 epoll 事件
    for (int i = 0; i < eventCount; i++) {
        // ...
    }    
Done: ;
    mNextMessageUptime = LLONG_MAX;
    // 3. 處理消息
    while (mMessageEnvelopes.size() != 0) { // 循環(huán)處理消息
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        // 3.1 取出隊頭消息
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) { 
            // 3.2 隊頭消息處理的時間點小于當(dāng)前時間點,表示要立即處理消息
            {
                // 獲取handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                // 消息交給handler處理
                handler->handleMessage(message);
            } // release handler
            mLock.lock();
            mSendingMessage = false;
            // POLL_CALLBACK 表示消息被回調(diào)處理
            result = POLL_CALLBACK;
        } else {
            // 3.2 隊頭的消息處理的時間點大于當(dāng)前時間,表示還沒有到處理的時間點,就退出處理消息的循環(huán)
            // mNextMessageUptime 表示下一個消息要處理的時間點,當(dāng)通過break退出循環(huán)后,
            // 在外層的下一次循調(diào)用pollInner()時,會通過 mNextMessageUptime 計算 epoll_wait 的超時時間
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }
    // Release lock.
    mLock.unlock();    
}

根據(jù)前面分析的消息發(fā)送的過程,消息保存在 mMessageEnvelopes 中。那么這里的第三步,很明顯是在處理消息。通過循環(huán),不斷取出消息,然后把消息的 messageEnvelope.uptime 與當(dāng)前時間進(jìn)行比較,如果小于當(dāng)前時間,就證明要立馬處理消息了,否則這些消息只能在下一次輪詢中再處理。

處理完了消息,現(xiàn)在來處理請求

int Looper::pollInner(int timeoutMillis) {
    // ...
    // 1. 等待事件
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    // ...
    // 2. 處理 epoll 事件
    for (int i = 0; i < eventCount; i++) {
        // ...
    }
Done: ;
    // 3. 處理消息
    while (mMessageEnvelopes.size() != 0) { // 循環(huán)處理消息
         // ...
    }
    // 4. 循環(huán)處理請求
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        // 檢測請求是否通過回調(diào)處理
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();
            // 表明消息被回調(diào)處理了
            result = POLL_CALLBACK;
        }
    }
    // 返回結(jié)果
    return result;
}

剛剛我們還提到,mResponse 中保存了待處理的請求。現(xiàn)在通過循環(huán),不斷取出請求來處理。處理請求有一個條件,那就是請求必須有回調(diào),否則不處理。 再回顧前面分析 監(jiān)聽請求 的代碼,當(dāng)Looper::addFd() 的參數(shù) callback 不為空時,Request.ident 的值為 POLL_CALLBACK,表明請求需要通過回調(diào)處理。

Looper::pollInner() 函數(shù)分析完畢,現(xiàn)在再回到 Looper::pollOnce()

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) { // 無限循環(huán)
        // 1. 處理那些不是通過callback處理的請求
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            // 從Looper::addFd()分析可知,只有當(dāng)callback為空的情況下,ident的值>=0,否則為POLL_CALLBACK(-2)
            // 因此,這里處理的是那些沒有通過callback處理的請求            
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                // 因為Looper無法通過callback處理,所以把這些元數(shù)據(jù)交給調(diào)用者處理
                if (outFd != nullptr) *outFd = fd;
                if (outEvents != nullptr) *outEvents = events;
                if (outData != nullptr) *outData = data;
                // 注意,這里返回的值大于0
                return ident;
            }
        }
        // 2. 處理pollInner()輪詢的結(jié)果
        // result的值有很多種,但是都為負(fù)數(shù)(注意,上面處理那些不是通過callback處理的請求,返回正值)
        // 1. POLL_WAKE(-1): 表示epoll_wait()是被eventfd喚醒的
        // 1. POLL_ERROR(-4): 表示epoll_wait()出錯
        // 2. POLL_TIMEOUT(-3) : 表示epoll_wait()超時
        // 3. POLL_CALLBACK(-2) : 表示消息或請求是通過回調(diào)處理的
        if (result != 0) {
            // 消息或事件無論是否被callback處理,這些傳入的參數(shù)都沒有意義,因此清空
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            // 注意,返回的是負(fù)值
            return result;
        }
        // 3. epoll 等待并處理事件(如果有事件到來)
        result = pollInner(timeoutMillis);
    }
}

從整體看,當(dāng) pollInner() 返回后,就會調(diào)用第一步和第二步來處理結(jié)果。

首先來看第一步,根據(jù)前面 監(jiān)聽請求 的分析,當(dāng) Looper::addFd() 的參數(shù) callback 為空時,Request.ident 的值才大于等于0。Looper::pollInner 只通過回調(diào)來處理請求,而對于那些沒有回調(diào)的請求呢?那就是在這里處理。而處理的方式是直接把元數(shù)據(jù)返回給調(diào)用者,那么意思就很明顯了,讓調(diào)用者自己處理。

再來看第二步,直接返回 Looper::pollInner() 的結(jié)果,并把參數(shù)清0。因為無論返回的什么結(jié)果,這些參數(shù)都沒有意義了,這一點請大家自己體會。

關(guān)于第一步和第二步,還有一點需要關(guān)注,第一步的返回值是正值,而第二步返回值是負(fù)值。

結(jié)束

本文對 Native 的 Looper 的主要函數(shù)進(jìn)行分析,揭開了 Native 層消息機(jī)制的核心,但是目前我并不能給一個很好例子來理解本文的內(nèi)容。需要大家在分析 Native 層源碼時慢慢體會。

可能有人會問,你為何不以 Java 層的消息機(jī)制為例來引出 Native 層的消息機(jī)制呢? 因為這樣廢話太多。

以上就是Native層消息機(jī)制深入探究實例解析的詳細(xì)內(nèi)容,更多關(guān)于Native層消息機(jī)制的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論