RabbitMQ簡單隊(duì)列實(shí)例及原理解析
這篇文章主要介紹了RabbitMQ簡單隊(duì)列實(shí)例及原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
RabbitMQ 簡述
RabbitMQ是一個(gè)消息代理:它接受并轉(zhuǎn)發(fā)消息。 您可以將其視為郵局:當(dāng)您將要把寄發(fā)的郵件投遞到郵箱中時(shí),您可以確信Postman 先生最終會(huì)將郵件發(fā)送給收件人。 在這個(gè)比喻中,RabbitMQ是一個(gè)郵箱,郵局和郵遞員,用來接受,存儲(chǔ)和轉(zhuǎn)發(fā)二進(jìn)制數(shù)據(jù)塊的消息。
隊(duì)列就像是在RabbitMQ中扮演郵箱的角色。 雖然消息經(jīng)過RabbitMQ和應(yīng)用程序,但它們只能存儲(chǔ)在隊(duì)列中。 隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制的限制,它本質(zhì)上是一個(gè)大的消息緩沖區(qū)。 許多生產(chǎn)者可以發(fā)送到一個(gè)隊(duì)列的消息,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。
producer即為生產(chǎn)者,用來產(chǎn)生消息發(fā)送給隊(duì)列。consumer是消費(fèi)者,需要去讀隊(duì)列內(nèi)的消息。producer,consumer和broker(rabbitMQ server)不必駐留在同一個(gè)主機(jī)上;確實(shí)在大多數(shù)應(yīng)用程序中它們是這樣分布的。
簡單隊(duì)列
簡單隊(duì)列是最簡單的一種模式,由生產(chǎn)者、隊(duì)列、消費(fèi)者組成。生產(chǎn)者將消息發(fā)送給隊(duì)列,消費(fèi)者從隊(duì)列中讀取消息完成消費(fèi)。
在下圖中,“P”是我們的生產(chǎn)者,“C”是我們的消費(fèi)者。 中間的框是隊(duì)列 - RabbitMQ代表消費(fèi)者的消息緩沖區(qū)。
java 方式
生產(chǎn)者
package com.anqi.mq.nat; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class MyProducer { private static final String QUEUE_NAME = "ITEM_QUEUE"; public static void main(String[] args) throws Exception { //1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創(chuàng)建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創(chuàng)建 Channel Channel channel = connection.createChannel(); //實(shí)際場(chǎng)景中,消息多為json格式的對(duì)象 String msg = "hello"; //4. 發(fā)送三條數(shù)據(jù) for (int i = 1; i <= 3 ; i++) { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Send message" + i +" : " + msg); } //5. 關(guān)閉連接 channel.close(); connection.close(); } }
/** * Declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException; /** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; /** * Start a non-nolocal, non-exclusive consumer, with * a server-generated consumerTag. * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param callback an interface to the consumer object * @return the consumerTag generated by the server * @throws java.io.IOException if an error is encountered * @see com.rabbitmq.client.AMQP.Basic.Consume * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer) */ String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
消費(fèi)者
package com.anqi.mq.nat; import com.rabbitmq.client.*; import java.io.IOException; public class MyConsumer { private static final String QUEUE_NAME = "ITEM_QUEUE"; public static void main(String[] args) throws Exception { //1. 創(chuàng)建一個(gè) ConnectionFactory 并進(jìn)行設(shè)置 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); //2. 通過連接工廠來創(chuàng)建連接 Connection connection = factory.newConnection(); //3. 通過 Connection 來創(chuàng)建 Channel Channel channel = connection.createChannel(); //4. 聲明一個(gè)隊(duì)列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); /* true:表示自動(dòng)確認(rèn),只要消息從隊(duì)列中獲取,無論消費(fèi)者獲取到消息后是否成功消費(fèi),都會(huì)認(rèn)為消息已經(jīng)成功消費(fèi) false:表示手動(dòng)確認(rèn),消費(fèi)者獲取消息后,服務(wù)器會(huì)將該消息標(biāo)記為不可用狀態(tài),等待消費(fèi)者的反饋,如果消費(fèi)者一 直沒有反饋,那么該消息將一直處于不可用狀態(tài),并且服務(wù)器會(huì)認(rèn)為該消費(fèi)者已經(jīng)掛掉,不會(huì)再給其發(fā)送消息, 直到該消費(fèi)者反饋。 */ //5. 創(chuàng)建消費(fèi)者并接收消息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; //6. 設(shè)置 Channel 消費(fèi)者綁定隊(duì)列 channel.basicConsume(QUEUE_NAME, true, consumer); } }
Send message1 : hello Send message2 : hello Send message3 : hello [*] Waiting for messages. To exit press CTRL+C [x] Received 'hello' [x] Received 'hello' [x] Received 'hello'
當(dāng)我們啟動(dòng)生產(chǎn)者之后查看RabbitMQ管理后臺(tái)可以看到有一條消息正在等待被消費(fèi)。
當(dāng)我們啟動(dòng)消費(fèi)者之后再次查看,可以看到積壓的一條消息已經(jīng)被消費(fèi)。
總結(jié)
- 隊(duì)列聲明queueDeclare的參數(shù):第一個(gè)參數(shù)表示隊(duì)列名稱、第二個(gè)參數(shù)為是否持久化(true表示是,隊(duì)列將在服務(wù)器重啟時(shí)生存)、第三個(gè)參數(shù)為是否是獨(dú)占隊(duì)列(創(chuàng)建者可以使用的私有隊(duì)列,斷開后自動(dòng)刪除)、第四個(gè)參數(shù)為當(dāng)所有消費(fèi)者客戶端連接斷開時(shí)是否自動(dòng)刪除隊(duì)列、第五個(gè)參數(shù)為隊(duì)列的其他參數(shù)。
- basicConsume的第二個(gè)參數(shù)autoAck: 應(yīng)答模式,true:自動(dòng)應(yīng)答,即消費(fèi)者獲取到消息,該消息就會(huì)從隊(duì)列中刪除掉,false:手動(dòng)應(yīng)答,當(dāng)從隊(duì)列中取出消息后,需要程序員手動(dòng)調(diào)用方法應(yīng)答,如果沒有應(yīng)答,該消息還會(huì)再放進(jìn)隊(duì)列中,就會(huì)出現(xiàn)該消息一直沒有被消費(fèi)掉的現(xiàn)象。
- 這種簡單隊(duì)列的模式,系統(tǒng)會(huì)為每個(gè)隊(duì)列隱式地綁定一個(gè)默認(rèn)交換機(jī),交換機(jī)名稱為" (AMQP default)",類型為直連 direct,當(dāng)你手動(dòng)創(chuàng)建一個(gè)隊(duì)列時(shí),系統(tǒng)會(huì)自動(dòng)將這個(gè)隊(duì)列綁定到一個(gè)名稱為空的 Direct 類型的交換機(jī)上,綁定的路由鍵 routing key 與隊(duì)列名稱相同,相當(dāng)于channel.queueBind(queue:"QUEUE_NAME", exchange:"(AMQP default)“, routingKey:"QUEUE_NAME");雖然實(shí)例沒有顯式聲明交換機(jī),但是當(dāng)路由鍵和隊(duì)列名稱一樣時(shí),就會(huì)將消息發(fā)送到這個(gè)默認(rèn)的交換機(jī)中。這種方式比較簡單,但是無法滿足復(fù)雜的業(yè)務(wù)需求,所以通常在生產(chǎn)環(huán)境中很少使用這種方式。
- The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.默認(rèn)交換機(jī)隱式綁定到每個(gè)隊(duì)列,其中路由鍵等于隊(duì)列名稱。不可能顯式綁定到,或從缺省交換中解除綁定。它也不能被刪除。 ——引自 RabbitMQ 官方文檔
spring-amqp方式
引入 Maven 依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.5.RELEASE</version> </dependency>
spring 配置文件
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd"> <rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/" username="guest" password="guest"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:queue name="MY-QUEUE"/> </beans>
使用測(cè)試
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class Main { public static void main(String[] args) { ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml"); AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class); amqpTemplate.convertAndSend("MY-QUEUE", "Item"); String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE"); System.out.println(msg); } }
參考方法
/** * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */ void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException; /** * Receive a message if there is one from a specific queue and convert it to a Java * object. Returns immediately, possibly with a null value. * * @param queueName the name of the queue to poll * @return a message or null if there is none waiting * @throws AmqpException if there is a problem */ @Nullable Object receiveAndConvert(String queueName) throws AmqpException;
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- 淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
- PHP+RabbitMQ實(shí)現(xiàn)消息隊(duì)列的完整代碼
- springboot實(shí)現(xiàn)rabbitmq的隊(duì)列初始化和綁定
- 使用PHP訪問RabbitMQ消息隊(duì)列的方法示例
- Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- 詳解Python操作RabbitMQ服務(wù)器消息隊(duì)列的遠(yuǎn)程結(jié)果返回
- 利用Python學(xué)習(xí)RabbitMQ消息隊(duì)列
相關(guān)文章
javaweb頁面附件、圖片下載及打開(實(shí)現(xiàn)方法)
下面小編就為大家?guī)硪黄猨avaweb頁面附件、圖片下載及打開(實(shí)現(xiàn)方法)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-06-06Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(15)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07SpringBoot使用AOP實(shí)現(xiàn)防重復(fù)提交功能
這篇文章主要為大家詳細(xì)介紹了SpringBoot如何使用AOP實(shí)現(xiàn)防重復(fù)提交功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-03-03Java之Pattern.compile函數(shù)用法詳解
這篇文章主要介紹了Java之Pattern.compile函數(shù)用法詳解,本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08如何基于SpringMVC實(shí)現(xiàn)斷點(diǎn)續(xù)傳(HTTP)
這篇文章主要介紹了如何基于SpringMVC實(shí)現(xiàn)斷點(diǎn)續(xù)傳(HTTP),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-01-01