SpringMVC基于阻塞隊(duì)列LinkedBlockingQueue的同步長輪詢功能實(shí)現(xiàn)詳解
引言
生產(chǎn)者不必是一個(gè)始終在執(zhí)行的線程,它可以是一個(gè)接口,接受客戶端的請求,向隊(duì)列中插入消息;消費(fèi)者也不必是一個(gè)始終在執(zhí)行的線程,它同樣也可以是一個(gè)接口,接受客戶端的請求,從隊(duì)列中取出屬于自己的消息;看到很多介紹生產(chǎn)者消息者實(shí)現(xiàn)的文章,實(shí)現(xiàn)場景都很簡單,現(xiàn)實(shí)應(yīng)用往往會比較復(fù)雜,有一些附加條件,本例中就需要根據(jù)消息中的 familyId 來判斷消息是不是下發(fā)給自己的。
應(yīng)用場景
本例的應(yīng)用場景是一個(gè)物聯(lián)網(wǎng)智能家居應(yīng)用,系統(tǒng)結(jié)構(gòu)圖如下:

主要實(shí)現(xiàn)的功能是用戶通過手機(jī)端APP發(fā)出設(shè)備控制的命令(如:開燈、關(guān)燈等)后,設(shè)備網(wǎng)關(guān)能夠?qū)崟r(shí)的獲取到控制命令,進(jìn)而控制設(shè)置的狀態(tài)。
為什么要使用長輪詢功能呢?
其實(shí)可選的方案有很多種:
1、長輪詢
2、長鏈接
3、Socket
4、WebSocket
5、MQTT
選擇長輪詢方案是因?yàn)槠鋵?shí)現(xiàn)的簡單性,實(shí)現(xiàn)起來與其它的接口基本沒有太大的差別。
同步與異步處理模式分析
同步服務(wù)模式:
同步服務(wù)為每個(gè)請求創(chuàng)建單一線程,由此線程完成整個(gè)請求的處理:接收消息,處理消息,返回?cái)?shù)據(jù);這種情況下服務(wù)器資源對所有入棧請求開放,服務(wù)器資源被所有入棧請求競爭使用,如果入棧請求過多就會導(dǎo)致服務(wù)器資源耗盡宕機(jī),或者導(dǎo)致競爭加劇,資源調(diào)度頻繁,服務(wù)器資源利用效率降低。
異步服務(wù)則可以分別設(shè)置兩個(gè)線程隊(duì)列,一個(gè)專門負(fù)責(zé)接收消息,另一個(gè)專門負(fù)責(zé)處理消息并返回?cái)?shù)據(jù),另有一些值守線程負(fù)責(zé)任務(wù)派發(fā)和超時(shí)監(jiān)控等工作。在這種情況下無論入棧請求有多少,服務(wù)器始終依照自己的能力處理請求,服務(wù)器資源消耗始終在一個(gè)可控的范圍。這種模式的一個(gè)問題就是這兩個(gè)線程隊(duì)列的大小如何根據(jù)機(jī)器負(fù)載情況動態(tài)調(diào)整。
異步服務(wù)模式:
這種情況下,雖然入棧請求以消息隊(duì)列的方式被異步處理但每個(gè)請求內(nèi)部卻是采用阻塞的方式訪問外部資源,如果外部資源訪問速度過慢,可能導(dǎo)致請求處理隊(duì)列中的所有線程均處于阻塞狀態(tài),此時(shí)CPU使用率雖然很低但是卻因?yàn)殛?duì)列中線程已滿而無法處理消息隊(duì)列中的新消息,此時(shí)若能調(diào)整線程隊(duì)列最大線程數(shù)將可提高CPU利用率。但另一個(gè)問題是如果線程數(shù)被調(diào)高之后所有線程的IO處理突然結(jié)束并且接下來每個(gè)線程都將進(jìn)行大量計(jì)算的話那么CPU可能出現(xiàn)過載。
在系統(tǒng)運(yùn)行的每個(gè)時(shí)間點(diǎn)上,當(dāng)時(shí)正在進(jìn)行IO的線程數(shù)量和正在進(jìn)行計(jì)算的線程數(shù)量是不斷變化著的,那么如何才能設(shè)計(jì)出一個(gè)可以根據(jù)系統(tǒng)當(dāng)時(shí)情況自動適應(yīng)負(fù)載變化的高度自適應(yīng)的系統(tǒng)呢?
在這方面采用反應(yīng)式計(jì)算模型確實(shí)能設(shè)計(jì)出適應(yīng)負(fù)載能力很強(qiáng)的系統(tǒng),系統(tǒng)利用率和吞吐量可以大幅提高,但這種系統(tǒng)仍然可能會出現(xiàn)系統(tǒng)局部負(fù)載過高的風(fēng)險(xiǎn)。采用反應(yīng)式計(jì)算模型,不僅系統(tǒng)中的入棧請求以消息隊(duì)列的方式得以異步化,而且系統(tǒng)中所有的IO任務(wù)也必需依照此法行之,這些IO任務(wù)的處理需要采用異步模型(如NIO)。另外要考慮的就是如何劃分異步IO消息并為其配置線程隊(duì)列了,比如是要將所有IO任務(wù)放入統(tǒng)一的隊(duì)列還是為某類IO任務(wù)設(shè)置單獨(dú)的隊(duì)列。
服務(wù)器資源雖然由系統(tǒng)分配但大多以線程為持有者被線程持有并使用,如線程堆棧,被線程持有的各類鎖等資源。
實(shí)現(xiàn)步驟
1、定義消息隊(duì)列
我這里是定義的靜態(tài)常量,你找個(gè)類把它放進(jìn)去就可以了。
/** * 存儲客戶端(用戶)提交的設(shè)置控制命令 */ public final static BlockingQueue<Equipment> EQUIPMENT_CONTROL = new LinkedBlockingQueue<Equipment>();
Equipment 是一個(gè)消息實(shí)體類,在本例中它最關(guān)鍵的屬性是 familyId,因?yàn)橐鶕?jù)它來判斷消息是下發(fā)給哪個(gè)家庭的,你發(fā)了一個(gè)關(guān)燈的命令結(jié)果我家的燈滅了這肯定是不行的。
2、實(shí)現(xiàn)生產(chǎn)者
生產(chǎn)者不必是一個(gè)始終在執(zhí)行的線程,它可以是一個(gè)接口,接受客戶端的請求;
/**
* 保存或更新設(shè)備接口
**/
@RequestMapping("/save.do")
@ResponseBody
public void save(HttpServletRequest request, HttpServletResponse response) throws Exception {
Personal personal = SecurityUtils.getPersonal(request);
Long personalId = personal.getId();
if (personalId == null) {
outFailureJson(response, BaseCodeMessage.personal_10001);
return;
}
//做你要做的事情
//向隊(duì)列中插入消息
ConstantDict.EQUIPMENT_CONTROL.put(entity);
//輸出響應(yīng)內(nèi)容
this.outResultJson(response, "success", "Equipment", entity);
}這里你要做的最關(guān)鍵的是:向隊(duì)列中插入消息。
Personal 是一個(gè)用戶信息實(shí)體類,通過 SecurityUtils.getPersonal(request); 方法根據(jù) Session 或 Cookie 來從緩存或數(shù)據(jù)庫中獲取當(dāng)前登錄用戶信息。
3、實(shí)現(xiàn)消費(fèi)者
此接口由網(wǎng)關(guān)調(diào)用。
這里要做的循環(huán)從隊(duì)列中取數(shù)據(jù),然后根據(jù) familyId 判斷消息是不是屬于自己的,是就退出循環(huán),不是就把剛剛?cè)〕龅南⒃俜呕厝ァ?/p>
/**
* 控制設(shè)置狀態(tài)接口_供網(wǎng)關(guān)調(diào)用 這是一個(gè)提供長輪詢的方法,網(wǎng)關(guān)通過長輪詢來即時(shí)獲得命令信息
*
* @author lipw
* @date 2017年8月30日下午3:31:59
* @param request
* @param response
* @throws Exception
*/
@RequestMapping("/ctrlgw.do")
@ResponseBody
public void controlgw(HttpServletRequest request, HttpServletResponse response) throws Exception {
Personal personal = SecurityUtils.getPersonal(request);
Long personalId = personal.getId();
if (personalId == null) {
outFailureJson(response, BaseCodeMessage.personal_10001);
return;
}
Long familyId = personal.getFamilyId();
if (familyId == null) {
outFailureJson(response, "2", "尚未分配家庭編號!");
return;
}
Equipment equipment = null;
while (true) {
equipment = ConstantDict.EQUIPMENT_CONTROL.poll(5000, TimeUnit.MILLISECONDS);
if (equipment != null) {
if (familyId.equals(equipment.getFamilyId())) {
System.out.println("從隊(duì)列取走一個(gè)元素,隊(duì)列剩余" + ConstantDict.EQUIPMENT_CONTROL.size() + "個(gè)元素");
break;
} else {
// 不屬于自己,再放回隊(duì)列
ConstantDict.EQUIPMENT_CONTROL.put(equipment);
}
}
Thread.sleep(100);
}
this.outResultJson(response, "success", "equipment", equipment);
}為什么要這樣設(shè)計(jì)呢?
因?yàn)槿绻皇峭ㄟ^ peek 方法來獲取,而不從 隊(duì)列 中移除,如果隊(duì)列頭部的消息不是屬于自己的,那就要一直循環(huán)下去卻得不到屬于自己的那一條消息。
使用 AJAX 模擬網(wǎng)關(guān)進(jìn)行測試
<div id="divCommand" style="width:98%; min-height:100px; border:1px solid #888;"></div>
<script src="${ctx}/static/script/jquery-1.10.2.min.js"></script>
<script type="text/javascript">
var successCount = 0;
function loadCommand(){
$.ajax({
url:"${ctx}/xxx/ctrlgw.do?token=xxx&t=" + Date.now(),
type:"get",
data:{},
dataType:"json",
success:function(data)
{
if(data != null && data!=""){
successCount++;
$("#divCommand").append(successCount + ", ");
}
loadCommand(); //成功后繼續(xù)回調(diào)
},error:function(data){
if(data != null && data!=""){
$("#divCommand").append(data.statusText);
}
if (data.statusText == "timeout"){
loadCommand(); //超時(shí)回調(diào)
}
}
});
}
$(document).ready(function(){
loadCommand();
});
</script>測試方法通過一個(gè)回調(diào)函數(shù),不斷的向服務(wù)器發(fā)出請求; 如果服務(wù)器隊(duì)列中有屬于自己的消息,會立即返回,沒有就會一直等待真到超時(shí),然后重新發(fā)起請求。
請求成功后會在 Div 中顯示成功的次數(shù),失敗了也會顯示失敗的狀態(tài)文本。

可以通過瀏覽器的開發(fā)者工具中的 Network 來查看每次請求所用的時(shí)間:

結(jié)束語
本例的實(shí)現(xiàn)方式是同步的,隊(duì)列沒有設(shè)置大小,生產(chǎn)者被阻塞的可能性很小,除非所有網(wǎng)關(guān)都與平臺斷開了連接不再處理消息;但消費(fèi)者的實(shí)現(xiàn)由于是同步的,會對服務(wù)器的性能有所影響,因?yàn)槊總€(gè)消費(fèi)者請求會占用一個(gè) Servlet 線程導(dǎo)致無法再去處理其它用戶請求。那么這個(gè)問題有沒有解決方案呢?當(dāng)然有!那就是采用異步處理模式 DeferredResult 。
到此這篇關(guān)于SpringMVC基于阻塞隊(duì)列LinkedBlockingQueue的同步長輪詢功能實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)SpringMVC的LinkedBlockingQueue輪詢內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Cloud @RefreshScope 原理及使用
這篇文章主要介紹了Spring Cloud @RefreshScope 原理及使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01
SpringBoot+Redis隊(duì)列實(shí)現(xiàn)Java版秒殺的示例代碼
本文主要介紹了SpringBoot+Redis隊(duì)列實(shí)現(xiàn)Java版秒殺的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
關(guān)于springboot 配置文件中屬性變量引用方式@@解析
這篇文章主要介紹了關(guān)于springboot 配置文件中屬性變量引用方式@@解析,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-04-04
Spring Boot實(shí)戰(zhàn)之發(fā)送郵件示例代碼
本篇文章主要介紹了Spring Boot實(shí)戰(zhàn)之發(fā)送郵件示例代碼,具有一定的參考價(jià)值,有興趣的可以了解一下。2017-03-03
mybatis的ParamNameResolver參數(shù)名稱解析
這篇文章主要為大家介紹了mybatis的ParamNameResolver參數(shù)名稱解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
如何基于spring security實(shí)現(xiàn)在線用戶統(tǒng)計(jì)
這篇文章主要介紹了如何基于spring security實(shí)現(xiàn)在線用戶統(tǒng)計(jì),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
Java SSM整合開發(fā)統(tǒng)一結(jié)果封裝詳解
這篇文章主要介紹了Java SSM整合開發(fā)實(shí)現(xiàn)統(tǒng)一結(jié)果封裝,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08

