java中brew安裝rabbitmq以及簡單實例
什么是消息隊列mq
可以看我之前寫的這篇 消息隊列MQ
rabbitmq簡介
RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queue 高級消息隊列協(xié)議)協(xié)議實現(xiàn)的消息隊列,它是一種應(yīng)用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。
brew安裝rabbitmq
1、安裝
brew install rabbitmq
這是安裝的 log
Management UI: http://localhost:15672 Homebrew-specific docs: https://rabbitmq.com/install-homebrew.html To start rabbitmq now and restart at login: brew services start rabbitmq Or, if you don't want/need a background service you can just run: CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
2、打開 http://localhost:15672
默認的用戶名密碼都是 guest,登錄后可以在 Admin 那一列菜單內(nèi)添加自己的用戶
權(quán)限
- 超級管理員(administrator)
可登陸管理控制臺,可查看所有的信息,并且可以對用戶,策略(policy)進行操作。
- 監(jiān)控者(monitoring)
可登陸管理控制臺,同時可以查看rabbitmq節(jié)點的相關(guān)信息(進程數(shù),內(nèi)存使用情況,磁盤使用情況等)
- 策略制定者(policymaker)
可登陸管理控制臺, 同時可以對policy進行管理。但無法查看節(jié)點的相關(guān)信息(上圖紅框標識的部分)。
- 普通管理者(management)
僅可登陸管理控制臺,無法看到節(jié)點信息,也無法對策略進行管理。
- 其他
無法登陸管理控制臺,通常就是普通的生產(chǎn)者和消費者。
創(chuàng)建虛擬主機(Virtual Hosts)
為了讓各個用戶可以互不干擾的工作,RabbitMQ添加了虛擬主機(Virtual Hosts)的概念。
其實就是一個獨立的訪問路徑,不同用戶使用不同路徑,各自有自己的隊列、交換機,互相不會影響。
交換機類型
1. 直連交換機(Direct Exchange)
特點:
- 基于路由鍵(Routing Key)的完全匹配來路由消息。
應(yīng)用場景:
- 當消息需要明確指定到某個隊列時,使用直連交換機。
舉例說明:
- 假設(shè)有一個訂單處理系統(tǒng),其中包含“訂單處理”和“訂單審核”兩個隊列。
- 生產(chǎn)者發(fā)送一個消息時,指定路由鍵為“order.process”,那么只有綁定了該路由鍵的“訂單處理”隊列會收到消息。
- 如果指定路由鍵為“order.review”,則只有“訂單審核”隊列會收到消息。
2. 扇形交換機(Fanout Exchange)
特點:
- 將消息廣播到所有綁定到它的隊列,忽略路由鍵。
- 如果N個隊列綁定到某個扇型交換機上,當有消息發(fā)送給此扇型交換機時,交換機會將消息的發(fā)送給這所有的N個隊列
應(yīng)用場景:
- 當需要將消息發(fā)送給多個隊列,且每個隊列都需要接收到相同消息的副本時,使用扇出交換機。
舉例說明:
- 假設(shè)有一個實時新聞發(fā)布系統(tǒng),包含“新聞訂閱者A”和“新聞訂閱者B”兩個隊列。
- 生產(chǎn)者發(fā)布一條新聞消息到扇出交換機,那么這條消息將被同時發(fā)送到“新聞訂閱者A”和“新聞訂閱者B”兩個隊列,實現(xiàn)廣播效果。
3. 主題交換機(Topic Exchange)
特點:
- 基于路由鍵和通配符(*和#)的模糊匹配來路由消息。
應(yīng)用場景:
- 當需要根據(jù)消息的不同類型或?qū)傩詫⑵渎酚傻讲煌年犃袝r,使用主題交換機。
舉例說明:
- 假設(shè)有一個日志系統(tǒng),包含“error.log”、“info.log”和“debug.log”三個隊列。
- 生產(chǎn)者發(fā)送一條錯誤日志消息,路由鍵為“system.error”。
- 此時,綁定了“*.error”或“system.#”的隊列將接收到這條消息,因此“error.log”隊列會收到它,而“info.log”和“debug.log”則不會(除非它們也綁定了匹配的路由鍵)。
*
(星號) 用來表示一個單詞 (必須出現(xiàn)的)
#
(井號) 用來表示任意數(shù)量(零個或多個)單詞
通配的綁定鍵是跟隊列進行綁定的,舉個小例子
- 隊列Q1 綁定鍵為*.TT.* 隊列Q2綁定鍵為 TT.#
- 如果一條消息攜帶的路由鍵為 A.TT.B,那么隊列Q1將會收到;
- 如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊列Q2將會收到;
4. 頭部交換機(Headers Exchange)
特點:
- 基于消息的頭部信息進行匹配,而不是路由鍵。
應(yīng)用場景:
- 當需要根據(jù)消息的復雜屬性進行路由時,使用頭部交換機。
- 這種交換機類型使用較少,因為它需要更復雜的匹配邏輯。
舉例說明(假設(shè)場景):
- 假設(shè)有一個消息處理系統(tǒng),其中消息具有多個頭部屬性,如“priority”(優(yōu)先級)、“type”(類型)等。
- 生產(chǎn)者發(fā)送一條高優(yōu)先級的緊急消息,并設(shè)置相應(yīng)的頭部屬性。
- 此時,只有那些設(shè)置了匹配這些頭部屬性(如“priority=high”)的隊列才會接收到這條消息。
RabbitMQ的工作原理
組成部分說明:
- Broker:消息隊列服務(wù)進程,此進程包括兩個部分:Exchange和Queue
- Exchange:消息隊列交換機,按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個隊列,對消息進行過慮。
- Queue:消息隊列,存儲消息的隊列,消息到達隊列并轉(zhuǎn)發(fā)給指定的消費者
- Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送
- Consumer:消息消費者,即消費方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
生產(chǎn)者發(fā)送消息流程:
- 1、生產(chǎn)者和Broker建立TCP連接。
- 2、生產(chǎn)者和Broker建立通道。
- 3、生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進行轉(zhuǎn)發(fā)。
- 4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊列)
消費者接收消息流程:
- 1、消費者和Broker建立TCP連接
- 2、消費者和Broker建立通道
- 3、消費者監(jiān)聽指定的Queue(隊列)
- 4、當有消息到達Queue時Broker默認將消息推送給消費者。
- 5、消費者接收到消息。
- 6、ack回復
使用
基本配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.application.name=zy-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=xxx spring.rabbitmq.password=xxx #虛擬host 可以不設(shè)置,使用server默認host spring.rabbitmq.virtual-host: JCcccHost
直連交換機示例
創(chuàng)建一個配置文件
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //隊列 起名:TestDirectQueue @Bean public Queue TestDirectQueue() { // durable:是否持久化,默認是false,持久化隊列:會被存儲在磁盤上,當消息代理重啟時仍然存在,暫存隊列:當前連接有效 // exclusive:默認也是false,只能被當前創(chuàng)建的連接使用,而且當連接關(guān)閉后隊列即被刪除。此參考優(yōu)先級高于durable // autoDelete:是否自動刪除,當沒有生產(chǎn)者或者消費者使用此隊列,該隊列會自動刪除。 // return new Queue("TestDirectQueue",true,true,false); //一般設(shè)置一下隊列的持久化就好,其余兩個就是默認false return new Queue("TestDirectQueue",true); } //Direct交換機 起名:TestDirectExchange @Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("TestDirectExchange",true,false); } //綁定 //將隊列和交換機綁定, 并設(shè)置用于匹配鍵:TestDirectRouting @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } @Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } }
寫個簡單的接口進行消息推送(根據(jù)需求也可以改為定時任務(wù)等等,具體看需求)
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController public class SendMessageController { //使用RabbitTemplate,這提供了接收/發(fā)送等等方法 @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendDirectMessage") public String sendDirectMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "test message, hello!"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageId",messageId); map.put("messageData",messageData); map.put("createTime",createTime); //將消息攜帶綁定鍵值:TestDirectRouting 發(fā)送到交換機TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map); return "ok"; } }
可以看到消息已經(jīng)推送到rabbitMq服務(wù)器上面了。接下來要消費這個消息了。
一般來說要開啟另一個服務(wù)來消費了,但是我們是練習就在當前服務(wù)進行消費。
然后是創(chuàng)建消息接收監(jiān)聽類
@Component //監(jiān)聽的隊列名稱 TestDirectQueue @RabbitListener(queues = "TestDirectQueue") public class DirectReceiver { @RabbitHandler public void process(Map testMessage) { System.out.println("DirectReceiver消費者收到消息 : " + testMessage.toString()); } }
運行項目,就把這個消息消費掉了。
扇形交換機、主題交換機
和上面的差不多,就先不寫了
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot DevTools之開發(fā)工具與熱部署機制詳解
本文將深入探討Spring Boot DevTools的核心功能、配置方法以及最佳實踐,幫助開發(fā)者顯著提升開發(fā)效率,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2025-04-04Java創(chuàng)建線程的七種方法總結(jié)(全網(wǎng)最全面)
線程是Java中的基本執(zhí)行單元,它允許程序在同一時間執(zhí)行多個任務(wù),下面這篇文章主要給大家總結(jié)介紹了關(guān)于Java創(chuàng)建線程的七種方法,文中通過實例代碼將這七種方法介紹的非常詳細,需要的朋友可以參考下2023-05-05mybatis insert 返回自增主鍵的實現(xiàn)示例
mybatis 在新增之后怎么也獲取不到自增主鍵,本文主要介紹了mybatis insert 返回自增主鍵的實現(xiàn)示例,具有一定的參考價值,感興趣的可以了解一下2024-06-06SecurityUtils.getSubject().getPrincipal()為null的問題
這篇文章主要介紹了SecurityUtils.getSubject().getPrincipal()為null的問題及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07Quarkus篇入門創(chuàng)建項目搭建debug環(huán)境
這篇文章主要為大家介紹了Quarkus篇入門創(chuàng)建項目搭建debug環(huán)境,先來一套hello?world,來搭建基本的運行及調(diào)試環(huán)境吧2022-02-02Java虛擬機內(nèi)存結(jié)構(gòu)及編碼實戰(zhàn)分享
這篇文章主要介紹了Java虛擬機內(nèi)存結(jié)構(gòu)及編碼實戰(zhàn)分享,文章圍繞詳細主題展開相關(guān)資料具有一定的參考價值,需要的小伙伴可以參考一下2022-04-04