RabbitMQ消息中間件示例詳解
前言
RabbitMQ 是使用 Erlang 語言開發(fā)的消息中間件, 其遵循了高級(jí)消息隊(duì)列協(xié)議(Advanced Message Queuing Protocol, AMQP)。
與 Kafka 等消息隊(duì)列相比,RabbitMQ 最大的優(yōu)勢(shì)在于其較高的可靠性:
- 提供確認(rèn)(ACK)和重傳機(jī)制保證消息完成消費(fèi), 消費(fèi)者異常不會(huì)導(dǎo)致消息丟失
- 提供消息持久化機(jī)制, broker 崩潰不會(huì)導(dǎo)致消息丟失
- 集群模式下工作, 保證高可用
因?yàn)榫哂休^高可靠性和一致性, RabbitMQ 可以勝任訂單處理、秒殺等一致性要求較高的業(yè)務(wù)場(chǎng)景。
RabbitMQ 概念與機(jī)制
RabbitMQ 中的概念模型:
- Broker: 消息中間件實(shí)例, 可能是單個(gè)節(jié)點(diǎn)也可能是運(yùn)行在多節(jié)點(diǎn)集群上的邏輯實(shí)體
- 消息(Message): 消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標(biāo)準(zhǔn)消息頭以及其它自定義消息頭,用于定義RabbitMQ對(duì)消息行為。消息體是字節(jié)流,包含消息內(nèi)容。
- 連接(Connection): 客戶端與 Broker 之間的 TCP連接
- 信道(Channel): Channel 是建立在 TCP 連接上的邏輯(虛擬)連接。多個(gè) Channel 復(fù)用同一個(gè) TCP 連接, 以避免建立 TCP 連接的巨大開銷。 RabbitMQ 官方要求每個(gè)線程使用獨(dú)立的 Channel, 禁止多個(gè)線程共用 Channel。
- 生產(chǎn)者(Publisher): 發(fā)送消息的客戶端線程
- 消費(fèi)者(Consumer): 處理消息的客戶端線程
- 交換機(jī)(Exchange): 交換機(jī)負(fù)責(zé)將消息投遞到相應(yīng)的隊(duì)列
- 隊(duì)列(Queue): 接收并保存交換機(jī)投遞的消息,直至被消費(fèi)者成功消費(fèi)。邏輯結(jié)構(gòu)遵循先進(jìn)先出FIFO。
- 綁定(Binding): 將隊(duì)列(Queue)注冊(cè)到交換機(jī)(Exchange)的路由表
- 虛擬主機(jī)(Vhost): 每個(gè)Broker下可建立多個(gè)vhost, 每個(gè) vhost 可建立獨(dú)立的 Exchange、Queue、綁定及權(quán)限系統(tǒng)。同一個(gè) Broker 下的 vhost 共享 Connection、Channel 和 用戶系統(tǒng),就是說可以使用同一個(gè)用戶身份使用同一個(gè) Channel 訪問不同 vhost。
交換機(jī)(Exchange)
生產(chǎn)者發(fā)送的消息會(huì)首先送到交換機(jī)(Exchange), 交換機(jī)根據(jù)自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊(duì)列中。
RabbitMQ中的四種標(biāo)準(zhǔn)交換機(jī):
direct: 如果消息的 routing-key 與隊(duì)列的 binding-key 完全相同,direct類型的交換機(jī)則會(huì)將消息投遞到該隊(duì)列中。
- 多個(gè)隊(duì)列可以使用相同的 binding-key 綁定到同一個(gè) direct 交換機(jī),direct 交換機(jī)會(huì)把消息投遞到所有 binding-key 與消息 routing-key 相同的隊(duì)列
topic: 允許隊(duì)列的 binding-key 中包含通配符*和#, topic 交換機(jī)會(huì)將消息投遞到 binding-key 與 routing-key 匹配的隊(duì)列中。
- 通配符按照關(guān)鍵字進(jìn)行匹配,如news.cn.a中的關(guān)鍵字是news、cn和a,即關(guān)鍵字按照.分割
- #通配符匹配0個(gè)或多個(gè)關(guān)鍵字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
- *通配符匹配一個(gè)關(guān)鍵字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a
fanout: fanout 交換機(jī)不進(jìn)行任何匹配, 將消息投遞到所有綁定的隊(duì)列
header: header 交換機(jī)根據(jù)消息頭進(jìn)行投遞,現(xiàn)在已較少使用
我們可以使用 RabbitMQ 的插件機(jī)制使用第三方交換機(jī)或自行開發(fā)交換機(jī)。如實(shí)現(xiàn)延時(shí)投遞的delayed-message-exchange。
消息頭中的delivery-mode可以設(shè)置為 persistent(持久化) 或者 transient(易失)。 Exchange 和 Queue 在處理持久化的消息時(shí)都會(huì)先將消息寫入磁盤中再進(jìn)行下一步處理, 即使 RabbitMQ 崩潰也不會(huì)丟失。
消費(fèi)者客戶端通常使用的channel.basicConsume使用推(push)模式投遞消息, 即當(dāng)有新消息時(shí) Broker 通過 channel 主動(dòng)向客戶端發(fā)送消息。客戶端也可以使用channel.basicGet從 Broker 拉取消息。
ACK機(jī)制
RabbitMQ 提供了確認(rèn)送達(dá)(acknowledge)機(jī)制保證消息被正確處理不會(huì)丟失。
確認(rèn)送達(dá)的回執(zhí)有三種:
- ACK: 消息已被成功處理
- NACK: 消息處理異常, 需要重新投遞
- REJECT: 消息非法, 丟棄消息
RabbitMQ 的 Queue 可以設(shè)置 no_ack=true, 則消息被投遞后即刪除不等待回執(zhí)。
channel.basicConsume 可以指定auto_ack模式,若auto_ack=true當(dāng)客戶端收到完整消息后即會(huì)自動(dòng)發(fā)出ACK回執(zhí),否則必須顯式的發(fā)出回執(zhí)。
Java 代碼示例
首先安裝并啟動(dòng)RabbitMQ實(shí)例, Mac用戶可以使用 Homebrew 進(jìn)行安裝:
brew install rabbitmq
啟動(dòng)服務(wù):
brew services start rabbitmq
或者使用官方docker鏡像:
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management
RabbitMQ官網(wǎng)提供了Ubuntu、RPM以及Windows等多種平臺(tái)安裝方式。
RabbitMQ默認(rèn)TCP端口為5672, Web控制臺(tái)默認(rèn)端口15672。
在Maven中添加依賴:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.1</version> </dependency>
編寫生產(chǎn)者:
package rabbit; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @author finley */ public class RabbitProducer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) { String exchangeName = "test-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hello"; byte[] msg = "hello world".getBytes(); AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder(); propsBuilder.deliveryMode(2); // persistent propsBuilder.priority(0); // normal propsBuilder.contentType("text/plain"); channel.basicPublish(exchangeName, routingKey, propsBuilder.build(), msg); } } }
編寫消費(fèi)者:
package rabbit; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * @author finley */ public class RabbitConsumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) { String exchangeName = "test-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String queueName = channel.queueDeclare().getQueue(); String bindingKey = "hello"; channel.queueBind(queueName, exchangeName, bindingKey); while(true) { channel.basicConsume(queueName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); String bodyStr = new String(body, "UTF-8"); System.out.println("routingKey: " + routingKey + ", contentType: " + contentType + ", body: " + bodyStr); long deliveryTag = envelope.getDeliveryTag(); channel.basicAck(deliveryTag, false); } }); } } } }
RabbitMQ 的消息為字節(jié), 可以將 Java 對(duì)象序列化后作為消息體發(fā)送。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Java基于NIO實(shí)現(xiàn)群聊系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了Java基于NIO實(shí)現(xiàn)群聊系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11Java實(shí)現(xiàn)EasyCaptcha圖形驗(yàn)證碼的具體使用
Java圖形驗(yàn)證碼,支持gif、中文、算術(shù)等類型,可用于Java Web、JavaSE等項(xiàng)目,下面就跟隨小編一起來了解一下2021-08-08

java實(shí)現(xiàn)Socket通信之單線程服務(wù)

利用JavaFX工具構(gòu)建Reactive系統(tǒng)

一篇文章帶你搞定 springsecurity基于數(shù)據(jù)庫的認(rèn)證(springsecurity整合mybatis)