Java中消息隊(duì)列任務(wù)的平滑關(guān)閉詳解
前言
消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削鋒等問(wèn)題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。目前使用較多的消息隊(duì)列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
消息隊(duì)列應(yīng)用場(chǎng)景
消息隊(duì)列在實(shí)際應(yīng)用中常用的使用場(chǎng)景:異步處理,應(yīng)用解耦,流量削鋒和消息通訊四個(gè)場(chǎng)景。
本文主要給大家介紹的是關(guān)于Java中消息隊(duì)列任務(wù)平滑關(guān)閉的相關(guān)內(nèi)容,分享出來(lái)供大家參考學(xué)習(xí),下面話(huà)不多說(shuō)了,來(lái)一起看看詳細(xì)的介紹吧。
1.問(wèn)題背景
對(duì)于消息隊(duì)列任務(wù)的監(jiān)聽(tīng),我們一般使用Java寫(xiě)一個(gè)獨(dú)立的程序,在Linux服務(wù)器上運(yùn)行。當(dāng)訂閱者程序啟動(dòng)后,會(huì)通過(guò)消息隊(duì)列客戶(hù)端接收消息,放入線(xiàn)程池中并發(fā)的處理。
那么問(wèn)題來(lái)了,當(dāng)我們修改程序后,需要重新啟動(dòng)時(shí),如何保證消息都能夠被處理呢?
一些開(kāi)源的消息隊(duì)列中間件,會(huì)提供ACK機(jī)制(消息確認(rèn)機(jī)制),當(dāng)訂閱者處理完消息后,會(huì)通知服務(wù)端刪除對(duì)應(yīng)消息,如果訂閱者出現(xiàn)異常,服務(wù)端未收到確認(rèn)消費(fèi),則會(huì)重試發(fā)送。
那如果消息隊(duì)列中間件沒(méi)有提供ACK機(jī)制,或者為了高吞度量的考慮關(guān)閉了ACK功能,如何最大可能保證消息都能夠被處理呢?
正常來(lái)說(shuō),訂閱者程序關(guān)閉后,消息會(huì)在隊(duì)列中堆積,等待訂閱者下次訂閱消費(fèi),所以未接收的消息是不會(huì)丟失的。可能出現(xiàn)的問(wèn)題就是在關(guān)閉的一瞬間,已經(jīng)從消息隊(duì)列中取出,但還沒(méi)有被處理的消息。
因此我們需要一套平滑關(guān)閉的機(jī)制,保證在重啟的時(shí)候,已接收的消息可以得到正常處理。
2.問(wèn)題分析
平滑關(guān)閉的思路如下:
- 在關(guān)閉程序時(shí),首先關(guān)閉消息訂閱,保證不再接收新的消息。
- 關(guān)閉線(xiàn)程池,等待線(xiàn)程池中的消息處理完畢。
- 程序退出。
關(guān)閉消息訂閱:消息隊(duì)列的客戶(hù)端都會(huì)提供關(guān)閉連接的方法,具體可以自行查看API。
關(guān)閉線(xiàn)程池:Java的ThreadPoolExecutor線(xiàn)程池提供shutdown()和shutdownNow()兩個(gè)方法,區(qū)別是前者會(huì)等待線(xiàn)程池中的消息都處理完畢,后者會(huì)直接停止所有線(xiàn)程并返回未處理完的線(xiàn)程List。因?yàn)槲覀冃枰褂?code>shutdown()方法進(jìn)行關(guān)閉,并通過(guò)isTerminated()方法,判斷線(xiàn)程池是否已經(jīng)關(guān)閉。
那么問(wèn)題又來(lái)了,我們?nèi)绾瓮ㄖ匠绦?,需要?zhí)行關(guān)閉操作呢?
在Linux中,進(jìn)程的關(guān)閉是通過(guò)信號(hào)傳遞的,我們可以用kill -9 pid關(guān)閉進(jìn)程,除了-9之外,我們可以通過(guò) kill -l,查看kill命令的其它信號(hào)量。

這里提供兩種關(guān)閉方法:
- 程序中添加
Runtime.getRuntime().addShutdownHook鉤子方法,SIGTERM,SIGINT,SIGHUP三種信號(hào)都會(huì)觸發(fā)該方法(分別對(duì)應(yīng)kill -1/kill -2/kill -15,Ctrl+C也會(huì)觸發(fā)SIGINT信號(hào))。 - 程序中通過(guò)Signal類(lèi)注冊(cè)信號(hào)監(jiān)聽(tīng),比如USR2(對(duì)應(yīng)kill -12),在handle方法中執(zhí)行關(guān)閉操作。
補(bǔ)充說(shuō)明:addShutdownHook方法和handle方法中如果再調(diào)用System.exit,會(huì)造成deadlock,使進(jìn)程無(wú)法正常退出。
偽代碼分別如下
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
//關(guān)閉訂閱者
//關(guān)閉線(xiàn)程池
//退出
}
});
//注冊(cè)linux kill信號(hào)量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
public void handle(Signal signal) {
//關(guān)閉訂閱者
//關(guān)閉線(xiàn)程池
//退出
}
});
模擬Demo
下面通過(guò)一個(gè)demo模擬相關(guān)邏輯操作
首先模擬一個(gè)生產(chǎn)者,每秒生產(chǎn)5個(gè)消息
然后模擬一個(gè)訂閱者,收到消息后,放入線(xiàn)程池進(jìn)行處理,線(xiàn)程池固定4個(gè)線(xiàn)程,每個(gè)線(xiàn)程處理時(shí)間1秒,這樣線(xiàn)程池每秒會(huì)積壓1個(gè)消息。
package com.lujianing.demo;
import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;
/**
* @author lujianing01@58.com
* @Description:
* @date 2016/11/14
*/
public class MsgClient {
//模擬消費(fèi)線(xiàn)程池 同時(shí)4個(gè)線(xiàn)程處理
private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
//模擬消息生產(chǎn)任務(wù)
private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
//用于判斷是否關(guān)閉訂閱
private static volatile boolean isClose = false;
public static void main(String[] args) throws InterruptedException {
//注冊(cè)鉤子方法
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
close();
}
});
BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
producer(queue);
consumer(queue);
}
//模擬消息隊(duì)列生產(chǎn)者
private static void producer(final BlockingQueue queue){
//每200毫秒向隊(duì)列中放入一個(gè)消息
SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
public void run() {
queue.offer("");
}
}, 0L, 200L, TimeUnit.MILLISECONDS);
}
//模擬消息隊(duì)列消費(fèi)者 生產(chǎn)者每秒生產(chǎn)5個(gè) 消費(fèi)者4個(gè)線(xiàn)程消費(fèi)1個(gè)1秒 每秒積壓1個(gè)
private static void consumer(final BlockingQueue queue) throws InterruptedException {
while (!isClose){
getPoolBacklogSize();
//從隊(duì)列中拿到消息
final String msg = (String)queue.take();
//放入線(xiàn)程池處理
if(!THREAD_POOL.isShutdown()) {
THREAD_POOL.execute(new Runnable() {
public void run() {
try {
//System.out.println(msg);
TimeUnit.MILLISECONDS.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
//查看線(xiàn)程池堆積消息個(gè)數(shù)
private static long getPoolBacklogSize(){
long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
return backlog;
}
private static void close(){
System.out.println("收到kill消息,執(zhí)行關(guān)閉操作");
//關(guān)閉訂閱消費(fèi)
isClose = true;
//關(guān)閉線(xiàn)程池,等待線(xiàn)程池積壓消息處理
THREAD_POOL.shutdown();
//判斷線(xiàn)程池是否關(guān)閉
while (!THREAD_POOL.isTerminated()) {
try {
//每200毫秒 判斷線(xiàn)程池積壓數(shù)量
getPoolBacklogSize();
TimeUnit.MILLISECONDS.sleep(200L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("訂閱者關(guān)閉,線(xiàn)程池處理完畢");
}
static {
String osName = System.getProperty("os.name").toLowerCase();
if(osName != null && osName.indexOf("window") == -1) {
//注冊(cè)linux kill信號(hào)量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
@Override
public void handle(Signal signal) {
close();
}
});
}
}
}

當(dāng)我們?cè)诜?wù)上運(yùn)行時(shí),通過(guò)控制臺(tái)可以看到相關(guān)的輸出信息,demo中輸出了線(xiàn)程池的積壓消息個(gè)數(shù)
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

另打開(kāi)一個(gè)終端,通過(guò)ps命令查看進(jìn)程號(hào),或者通過(guò)nohup啟動(dòng)Java進(jìn)程拿到進(jìn)程id
ps -fe|grep MsgClient

當(dāng)我們執(zhí)行kill -12 pid的時(shí)候 可以看到關(guān)閉業(yè)務(wù)邏輯

3.總結(jié)
其實(shí)不單單消息隊(duì)列任務(wù),在常見(jiàn)的RPC服務(wù)中也會(huì)見(jiàn)到類(lèi)似的功能,比如58的SCF,在源碼中,也會(huì)分別注冊(cè)了USR2信號(hào)量和addShutdownHook鉤子方法。
在重啟腳本中,首先會(huì)發(fā)送kill -12命令,RPC服務(wù)收到信號(hào)后會(huì)修改Server狀態(tài)為關(guān)閉。接著會(huì)發(fā)送kill -15命令,觸發(fā)鉤子方法,關(guān)閉所有的連接。
好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
淺談SpringBoot集成Quartz動(dòng)態(tài)定時(shí)任務(wù)
這篇文章主要介紹了SpringBoot集成Quartz動(dòng)態(tài)定時(shí)任務(wù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04
Java設(shè)計(jì)模式之迭代模式(Iterator模式)介紹
這篇文章主要介紹了Java設(shè)計(jì)模式之迭代模式(Iterator模式)介紹,本文用一個(gè)老師點(diǎn)名的現(xiàn)象描述了迭代模式的使用,需要的朋友可以參考下2015-03-03
springBoot啟動(dòng)時(shí)讓方法自動(dòng)執(zhí)行的幾種實(shí)現(xiàn)方式
這篇文章主要介紹了springBoot啟動(dòng)時(shí)讓方法自動(dòng)執(zhí)行的幾種實(shí)現(xiàn)方式,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
xxl-job如何濫用netty導(dǎo)致的問(wèn)題及解決方案
本篇文章講解xxl-job作為一款分布式任務(wù)調(diào)度系統(tǒng)是如何濫用netty的,導(dǎo)致了怎樣的后果以及如何修改源碼解決這些問(wèn)題,netty作為一種高性能的網(wǎng)絡(luò)編程框架,十分受大家喜愛(ài),今天就xxl-job濫用netty這一問(wèn)題給大家詳細(xì)下,感興趣的朋友一起看看吧2021-05-05
淺談java實(shí)現(xiàn)mongoDB的多條件查詢(xún)
這篇文章主要介紹了java實(shí)現(xiàn)mongoDB的多條件查詢(xún),具有一定參考價(jià)值,需要的朋友可以參考下。2017-09-09
JAVA中使用FileWriter寫(xiě)數(shù)據(jù)到文本文件步驟詳解
這篇文章主要介紹了JAVA中使用FileWriter寫(xiě)數(shù)據(jù)到文本文件步驟詳解,FileWriter類(lèi)提供了多種寫(xiě)入字符的方法,包括寫(xiě)入單個(gè)字符、寫(xiě)入字符數(shù)組和寫(xiě)入字符串等,它還提供了一些其他的方法,如刷新緩沖區(qū)、關(guān)閉文件等,需要的朋友可以參考下2023-10-10
微服務(wù)中使用Maven BOM來(lái)管理你的版本依賴(lài)詳解
這篇文章主要介紹了微服務(wù)中使用Maven BOM來(lái)管理你的版本依賴(lài),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
Spring boot使用spring retry重試機(jī)制的方法示例
這篇文章主要介紹了Spring boot使用spring retry重試機(jī)制的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01

