欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中消息隊列任務的平滑關閉詳解

 更新時間:2017年11月07日 10:11:16   作者:蛙牛  
對于消息隊列的監(jiān)聽,我們一般使用Java寫一個獨立的程序,在Linux服務器上運行。程序啟動后,通過消息隊列客戶端接收消息,放入一個線程池進行異步處理,并發(fā)的快速處理。這篇文章主要給大家介紹了關于Java中消息隊列任務的平滑關閉的相關資料,需要的朋友可以參考下。

前言

消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應用解耦,異步消息,流量削鋒等問題,實現(xiàn)高性能,高可用,可伸縮和最終一致性架構。目前使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

消息隊列應用場景

消息隊列在實際應用中常用的使用場景:異步處理,應用解耦,流量削鋒和消息通訊四個場景。

本文主要給大家介紹的是關于Java中消息隊列任務平滑關閉的相關內容,分享出來供大家參考學習,下面話不多說了,來一起看看詳細的介紹吧。

1.問題背景

對于消息隊列任務的監(jiān)聽,我們一般使用Java寫一個獨立的程序,在Linux服務器上運行。當訂閱者程序啟動后,會通過消息隊列客戶端接收消息,放入線程池中并發(fā)的處理。

那么問題來了,當我們修改程序后,需要重新啟動時,如何保證消息都能夠被處理呢?

一些開源的消息隊列中間件,會提供ACK機制(消息確認機制),當訂閱者處理完消息后,會通知服務端刪除對應消息,如果訂閱者出現(xiàn)異常,服務端未收到確認消費,則會重試發(fā)送。

那如果消息隊列中間件沒有提供ACK機制,或者為了高吞度量的考慮關閉了ACK功能,如何最大可能保證消息都能夠被處理呢?

正常來說,訂閱者程序關閉后,消息會在隊列中堆積,等待訂閱者下次訂閱消費,所以未接收的消息是不會丟失的??赡艹霈F(xiàn)的問題就是在關閉的一瞬間,已經(jīng)從消息隊列中取出,但還沒有被處理的消息。

因此我們需要一套平滑關閉的機制,保證在重啟的時候,已接收的消息可以得到正常處理。

2.問題分析

平滑關閉的思路如下:

  • 在關閉程序時,首先關閉消息訂閱,保證不再接收新的消息。
  • 關閉線程池,等待線程池中的消息處理完畢。
  • 程序退出。

關閉消息訂閱:消息隊列的客戶端都會提供關閉連接的方法,具體可以自行查看API。

關閉線程池:Java的ThreadPoolExecutor線程池提供shutdown()shutdownNow()兩個方法,區(qū)別是前者會等待線程池中的消息都處理完畢,后者會直接停止所有線程并返回未處理完的線程List。因為我們需要使用shutdown()方法進行關閉,并通過isTerminated()方法,判斷線程池是否已經(jīng)關閉。

那么問題又來了,我們如何通知到程序,需要執(zhí)行關閉操作呢?

在Linux中,進程的關閉是通過信號傳遞的,我們可以用kill -9 pid關閉進程,除了-9之外,我們可以通過 kill -l,查看kill命令的其它信號量。

這里提供兩種關閉方法:

  • 程序中添加Runtime.getRuntime().addShutdownHook鉤子方法,SIGTERM,SIGINT,SIGHUP三種信號都會觸發(fā)該方法(分別對應kill -1/kill -2/kill -15,Ctrl+C也會觸發(fā)SIGINT信號)。
  • 程序中通過Signal類注冊信號監(jiān)聽,比如USR2(對應kill -12),在handle方法中執(zhí)行關閉操作。

補充說明:addShutdownHook方法和handle方法中如果再調用System.exit,會造成deadlock,使進程無法正常退出。

偽代碼分別如下

Runtime.getRuntime().addShutdownHook(new Thread() {
 public void run() {
 //關閉訂閱者
 //關閉線程池
 //退出
 }
});
 //注冊linux kill信號量 kill -12
Signal sig = new Signal("USR2");
Signal.handle(sig, new SignalHandler() {
 @Override
 public void handle(Signal signal) {
 //關閉訂閱者
 //關閉線程池
 //退出
 }
});

模擬Demo

下面通過一個demo模擬相關邏輯操作

首先模擬一個生產者,每秒生產5個消息

然后模擬一個訂閱者,收到消息后,放入線程池進行處理,線程池固定4個線程,每個線程處理時間1秒,這樣線程池每秒會積壓1個消息。

package com.lujianing.demo;

import sun.misc.Signal;
import sun.misc.SignalHandler;
import java.util.concurrent.*;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/11/14
 */
public class MsgClient {

 //模擬消費線程池 同時4個線程處理
 private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
 
 //模擬消息生產任務
 private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
 
 //用于判斷是否關閉訂閱
 private static volatile boolean isClose = false;

 public static void main(String[] args) throws InterruptedException {
 
 //注冊鉤子方法
 Runtime.getRuntime().addShutdownHook(new Thread() {
  public void run() {
  close();
  }
 });

 BlockingQueue <String> queue = new ArrayBlockingQueue<String>(100);
 producer(queue);
 consumer(queue);

 }

 //模擬消息隊列生產者
 private static void producer(final BlockingQueue queue){
 //每200毫秒向隊列中放入一個消息
 SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(new Runnable() {
  public void run() {
  queue.offer("");
  }
 }, 0L, 200L, TimeUnit.MILLISECONDS);
 }

 //模擬消息隊列消費者 生產者每秒生產5個 消費者4個線程消費1個1秒 每秒積壓1個
 private static void consumer(final BlockingQueue queue) throws InterruptedException {
 while (!isClose){
  getPoolBacklogSize();
  //從隊列中拿到消息
  final String msg = (String)queue.take();
  //放入線程池處理
  if(!THREAD_POOL.isShutdown()) {
  THREAD_POOL.execute(new Runnable() {
   public void run() {
   try {
    //System.out.println(msg);
    TimeUnit.MILLISECONDS.sleep(1000L);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   }
  });
  }
 }
 }

 //查看線程池堆積消息個數(shù)
 private static long getPoolBacklogSize(){
 long backlog = THREAD_POOL.getTaskCount()- THREAD_POOL.getCompletedTaskCount();
 System.out.println(String.format("[%s]THREAD_POOL backlog:%s",System.currentTimeMillis(),backlog));
 return backlog;
 }

 private static void close(){
 System.out.println("收到kill消息,執(zhí)行關閉操作");
 //關閉訂閱消費
 isClose = true;
 //關閉線程池,等待線程池積壓消息處理
 THREAD_POOL.shutdown();
 //判斷線程池是否關閉
 while (!THREAD_POOL.isTerminated()) {
  try {
  //每200毫秒 判斷線程池積壓數(shù)量
  getPoolBacklogSize();
  TimeUnit.MILLISECONDS.sleep(200L);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
 }
 System.out.println("訂閱者關閉,線程池處理完畢");
 }

 static {
 String osName = System.getProperty("os.name").toLowerCase();
 if(osName != null && osName.indexOf("window") == -1) {
  //注冊linux kill信號量 kill -12
  Signal sig = new Signal("USR2");
  Signal.handle(sig, new SignalHandler() {
  @Override
  public void handle(Signal signal) {
   close();
  }
  });
 }
 }

}

當我們在服務上運行時,通過控制臺可以看到相關的輸出信息,demo中輸出了線程池的積壓消息個數(shù)

java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.MsgClient

另打開一個終端,通過ps命令查看進程號,或者通過nohup啟動Java進程拿到進程id

ps -fe|grep MsgClient

當我們執(zhí)行kill -12 pid的時候 可以看到關閉業(yè)務邏輯

3.總結

其實不單單消息隊列任務,在常見的RPC服務中也會見到類似的功能,比如58的SCF,在源碼中,也會分別注冊了USR2信號量和addShutdownHook鉤子方法。

在重啟腳本中,首先會發(fā)送kill -12命令,RPC服務收到信號后會修改Server狀態(tài)為關閉。接著會發(fā)送kill -15命令,觸發(fā)鉤子方法,關閉所有的連接。

好了,以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。

相關文章

  • 淺談SpringBoot集成Quartz動態(tài)定時任務

    淺談SpringBoot集成Quartz動態(tài)定時任務

    這篇文章主要介紹了SpringBoot集成Quartz動態(tài)定時任務,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-04-04
  • Java設計模式之迭代模式(Iterator模式)介紹

    Java設計模式之迭代模式(Iterator模式)介紹

    這篇文章主要介紹了Java設計模式之迭代模式(Iterator模式)介紹,本文用一個老師點名的現(xiàn)象描述了迭代模式的使用,需要的朋友可以參考下
    2015-03-03
  • 深入理解Java中線程間的通信

    深入理解Java中線程間的通信

    一般來講,線程內部有自己私有的線程上下文,互不干擾。但是當我們需要多個線程之間相互協(xié)作的時候,就需要我們掌握Java線程的通信方式。本文將介紹Java線程之間的幾種通信原理,需要的可以參考一下
    2022-11-11
  • springBoot啟動時讓方法自動執(zhí)行的幾種實現(xiàn)方式

    springBoot啟動時讓方法自動執(zhí)行的幾種實現(xiàn)方式

    這篇文章主要介紹了springBoot啟動時讓方法自動執(zhí)行的幾種實現(xiàn)方式,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-03-03
  • xxl-job如何濫用netty導致的問題及解決方案

    xxl-job如何濫用netty導致的問題及解決方案

    本篇文章講解xxl-job作為一款分布式任務調度系統(tǒng)是如何濫用netty的,導致了怎樣的后果以及如何修改源碼解決這些問題,netty作為一種高性能的網(wǎng)絡編程框架,十分受大家喜愛,今天就xxl-job濫用netty這一問題給大家詳細下,感興趣的朋友一起看看吧
    2021-05-05
  • 淺談java實現(xiàn)mongoDB的多條件查詢

    淺談java實現(xiàn)mongoDB的多條件查詢

    這篇文章主要介紹了java實現(xiàn)mongoDB的多條件查詢,具有一定參考價值,需要的朋友可以參考下。
    2017-09-09
  • JAVA中使用FileWriter寫數(shù)據(jù)到文本文件步驟詳解

    JAVA中使用FileWriter寫數(shù)據(jù)到文本文件步驟詳解

    這篇文章主要介紹了JAVA中使用FileWriter寫數(shù)據(jù)到文本文件步驟詳解,FileWriter類提供了多種寫入字符的方法,包括寫入單個字符、寫入字符數(shù)組和寫入字符串等,它還提供了一些其他的方法,如刷新緩沖區(qū)、關閉文件等,需要的朋友可以參考下
    2023-10-10
  • 微服務中使用Maven BOM來管理你的版本依賴詳解

    微服務中使用Maven BOM來管理你的版本依賴詳解

    這篇文章主要介紹了微服務中使用Maven BOM來管理你的版本依賴,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-12-12
  • Java8 Predicate花樣用法詳解

    Java8 Predicate花樣用法詳解

    本文主要介紹了Java 8 Predicate花樣用法詳解,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • Spring boot使用spring retry重試機制的方法示例

    Spring boot使用spring retry重試機制的方法示例

    這篇文章主要介紹了Spring boot使用spring retry重試機制的方法示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-01-01

最新評論