RabbitMQ實(shí)現(xiàn)Work Queue工作隊(duì)列的示例詳解
RabbitMQ Work Queue工作隊(duì)列
工作隊(duì)列(又稱任務(wù)隊(duì)列)的主要思想是避免立即執(zhí)行資源密集型任務(wù),而不得不等待它完成。
相反我們安排任務(wù)在之后執(zhí)行。我們把任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個(gè)工作線程時(shí),這些工作線程將一起處理這些任務(wù)。
多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,同一條消息只會(huì)被一個(gè)消費(fèi)者處理。
但是對(duì)于工作隊(duì)列,可以提高消息的處理速度,避免隊(duì)列中的消息堆積。
我們以一個(gè)例子來(lái)解釋work queue工作隊(duì)列。在生產(chǎn)者的服務(wù)中添加測(cè)試方法,通過(guò)循環(huán)的方式,向名為simple.queue隊(duì)列中發(fā)送50條消息,代碼和詳細(xì)描述如下:
package cn.itcast.mq.spring; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner.class) public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage2WorkQueue() throws InterruptedException { String queueName="simple.queue";//隊(duì)列名稱 String message = "hello, message_";//發(fā)送的消息 for (int i=1;i<=50;i++){ rabbitTemplate.convertAndSend(queueName,message+i); Thread.sleep(20); } } }
在消費(fèi)者的服務(wù)模塊中,定義兩個(gè)消息監(jiān)聽(tīng),分別為listenSimpleQueue1和listenSimpleQueue2,讓它們都監(jiān)聽(tīng)simple.queue隊(duì)列,并且設(shè)置休眠時(shí)間,使得消費(fèi)者1每秒處理50條消息,消費(fèi)者2每秒處理10條消息。
package cn.itcast.mq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.time.LocalTime; @Component public class SpringRabbitListener { @RabbitListener(queues="simple.queue") public void listenSimpleQueue1(String msg) throws InterruptedException { System.out.println("消費(fèi)者1已經(jīng)接收到simple.queue的消息:[" + msg + "]"+ LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues="simple.queue") public void listenSimpleQueue2(String msg) throws InterruptedException { System.err.println("消費(fèi)者2已經(jīng)接收到simple.queue的消息:[" + msg + "]"+LocalTime.now()); Thread.sleep(200); } }
消費(fèi)者的application.yaml文件,設(shè)置消費(fèi)者每次只能獲取一條消息,生產(chǎn)者和消費(fèi)者的配置文件相似。
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 192.168.220.13* port: 5672 username: user password: ****** virtual-host: / Listener: simple: prefetch: 1 #每次只能獲取一條消息,處理完成才能獲取下一條消息 控制消費(fèi)者預(yù)取消息的上限
處理完成后,運(yùn)行項(xiàng)目,可以得到消費(fèi)者1和消費(fèi)者2都能消費(fèi)消息,并且可以根據(jù)休眠時(shí)間有序進(jìn)行工作。
到此這篇關(guān)于RabbitMQ實(shí)現(xiàn)Work Queue工作隊(duì)列的示例詳解的文章就介紹到這了,更多相關(guān)RabbitMQ Work Queue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java jdk1.8 使用stream流進(jìn)行l(wèi)ist 分組歸類操作
這篇文章主要介紹了java jdk1.8 使用stream流進(jìn)行l(wèi)ist 分組歸類操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10SpringBoot JPA懶加載失效的解決方案(親測(cè)有效)
這篇文章主要介紹了SpringBoot JPA懶加載失效的解決方案(親測(cè)有效),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08解決常見(jiàn)的Eclipse SVN插件報(bào)錯(cuò)方法詳解
本篇文章是對(duì)常見(jiàn)的Eclipse SVN插件報(bào)錯(cuò)方法進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下2013-05-05關(guān)于stream().sorted()以及java中常用的比較器排序
這篇文章主要介紹了關(guān)于stream().sorted()以及java中常用的比較器排序,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05在SpringBoot項(xiàng)目中利用maven的generate插件
今天小編就為大家分享一篇關(guān)于在SpringBoot項(xiàng)目中利用maven的generate插件,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-01-01Java實(shí)現(xiàn)線程的暫停和恢復(fù)的示例詳解
這幾天的項(xiàng)目中,客戶給了個(gè)需求,希望我可以開(kāi)啟一個(gè)任務(wù),想什么時(shí)候暫停就什么時(shí)候暫停,想什么時(shí)候開(kāi)始就什么時(shí)候開(kāi)始,所以本文小編給大家介紹了Java實(shí)現(xiàn)線程的暫停和恢復(fù)的示例,需要的朋友可以參考下2023-11-11