Java消息隊(duì)列RabbitMQ入門(mén)詳解
主流中間件對(duì)比
ActiveMQ 是 Apache 出品,最流行的,能力強(qiáng)勁的開(kāi)源消息總線,并且它一 個(gè)完全支持 J M S 規(guī)范的消息中間件。
其豐富的 API 、多種集群構(gòu)建模式使得他成為業(yè)界老牌消息中間件,在中 小型企業(yè)中應(yīng)用廣泛!
MQ 衡量指標(biāo):服務(wù)性能、數(shù)據(jù)存儲(chǔ)、集群架構(gòu)
Kafka
RocketMQ是阿里開(kāi)源的消息中間件,目前也已經(jīng)孵化為Apache頂級(jí)項(xiàng)目, 它是純java開(kāi)發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng) 應(yīng)用的特點(diǎn)。
RocketMQ思路起源于Kafka,它對(duì)消息的可靠傳輸及事務(wù) 性做了優(yōu)化, 目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計(jì)算、消息推 送、日志流式處理、binglog分發(fā)等場(chǎng)景
RabbitMQ是使用Erlang語(yǔ)言開(kāi)發(fā)的開(kāi)源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議 來(lái)實(shí)現(xiàn)。
AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布 /訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi), 對(duì)數(shù)據(jù)_致 性、穩(wěn)定性和可靠性要求很髙的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。
結(jié)論:
- activiMq老牌消息中間件,api全面,但是吞吐量不大
- Kafaka吞吐量大,但是數(shù)據(jù)無(wú)法保證不丟失,主要面向大數(shù)據(jù)
- rokectMQ:吞吐量大,保證數(shù)據(jù)不丟失,并且支持分布式事物,但是商業(yè)版需要收費(fèi)
- rabbitMQ:吞吐量大,數(shù)據(jù)不易丟失
初識(shí)RabbitMQ
RabbitMQ是—個(gè)開(kāi)源的消息代理和隊(duì)列服務(wù)器,用來(lái)通過(guò)普通協(xié)議 在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang語(yǔ)言來(lái)編寫(xiě) 的,并且RabbitMQ是基于AMQP協(xié)議的。
哪些大廠在用RabbitMQ,為什幺?
- 滴滴、美團(tuán)、頭條、去哪兒、藝龍…
- 開(kāi)源、性能優(yōu)秀,穩(wěn)定性保障
- 提供可靠性消息投遞模式(confirm)、返回模式(return )
- 與SpringAMQP完美的整合、API豐富
- 集群模式豐富,表達(dá)式配置,HA模式,鏡像隊(duì)列模型
- 保證數(shù)據(jù)不丟失的前提做到高可靠性、可用性
RabbitMQ高性能的原因?
Erlang語(yǔ)言最初在于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得 RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能是非常優(yōu)秀的
Erlang的優(yōu)點(diǎn):Erlang有著和原生Socket—樣的延遲
什么是AMQP高級(jí)消息隊(duì)列協(xié)議?
- AMQP定義: 是具有現(xiàn)代特征的二進(jìn)制協(xié)議;
- 是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議;
- 是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì);
AMQP核心概念(重點(diǎn))
- Server:又稱Broker,接受客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù)
- Connection:連接:應(yīng)用程序與Broker的網(wǎng)絡(luò)連接
- Channel:網(wǎng)絡(luò)通道,幾乎所有的操作都在Channel中進(jìn)行,Channel是進(jìn)行消息讀寫(xiě)的通道;客戶端可建立多個(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù);
- Message:消息,服務(wù)器與應(yīng)用程序之間傳遞的數(shù)據(jù),由Properties和Body組成。Properties可以對(duì)消息進(jìn)行裝飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性;Body則就是消息體內(nèi)容;
- Virtual host:虛擬地址,用于進(jìn)行邏輯隔離,最上層的消息路由;一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或Queue;
- Exchange:交換機(jī),交換消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列;
- Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key;
- Routing key:一個(gè)路由規(guī)則,虛擬機(jī)可用它來(lái)確定如何路由一個(gè)特定消息
- Queue:也稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者
RabbitMQ安裝及使用
Centos安裝方式
注意:Erlang語(yǔ)言與RabbitMQ安裝版本必須匹配
RabbitMQ官網(wǎng)安裝
官網(wǎng)地址:https://www.rabbitmq.com/
提前準(zhǔn)備:安裝Linux必要依賴包
下載RabbitMQ必須安裝包
配置文件修改
服務(wù)的啟動(dòng):rabbitmq-server start &
服務(wù)的停止:rabbitmqctl stop_app
管理插件:rabbitmq-plugins enable rabbitmq_management
訪問(wèn)地址://ip:15672/
詳細(xì)步驟
- 準(zhǔn)備:
yum install \ build-essential openssl openssl-devel unixODBC unixODBC-devel \ make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz -y
- 下載:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
- 安裝:
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm rpm -ivh socat-1.7.3.2-1.1.el7.x86_64.rpm --nodeps --force rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
- 配置文件:
vi /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
- 比如修改密碼、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
- 服務(wù)啟動(dòng)
rabbitmq-server start &
- 服務(wù)停止
rabbitmqctl app_stop
- 查看服務(wù)是否成功:
yum install lsof lsof -i:5672
- 管理插件:
rabbitmq-plugins enable rabbitmq_management
- 訪問(wèn)地址:
http://192.168.147.146:15672/
Docker安裝方式
注意獲取鏡像的時(shí)候要獲取management版本的,不要獲取last版本的,management版本的才帶有管理界面
1.查詢鏡像
docker search rabbitmq:management
2.獲取鏡像
docker pull rabbitmq:management
3.運(yùn)行鏡像
- 方式一:
默認(rèn)guest用戶,密碼也是guest docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management
- 方式二:
設(shè)置用戶名和密碼
docker run -d –name my-rabbitmq -p 5672:5672 -p 15672:15672 -v /data:/var/lib/rabbitmq –hostname my-rabbitmq-host -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin –restart=always rabbitmq:management
參數(shù)說(shuō)明:
- -d:后臺(tái)運(yùn)行容器
- -name:指定容器名
- -p:指定服務(wù)運(yùn)行的端口(5672:應(yīng)用訪問(wèn)端口;15672:控制臺(tái)Web端口號(hào))
- -v:映射目錄或文件,啟動(dòng)了一個(gè)數(shù)據(jù)卷容器,數(shù)據(jù)卷路徑為:/var/lib/rabbitmq,再將此數(shù)據(jù)卷映射到住宿主機(jī)的/data目錄
- –hostname:主機(jī)名(RabbitMQ的一個(gè)重要注意事項(xiàng)是它根據(jù)所謂的 “節(jié)點(diǎn)名稱” 存儲(chǔ)數(shù)據(jù),默認(rèn)為主機(jī)名)
- -e:指定環(huán)境變量;(RABBITMQ_DEFAULT_VHOST:默認(rèn)虛擬機(jī)名;RABBITMQ_DEFAULT_USER:默認(rèn)的用戶名;RABBITMQ_DEFAULT_PASS:默認(rèn)用戶名的密碼)
- –restart=always:當(dāng)Docker重啟時(shí),容器能自動(dòng)啟動(dòng)
- rabbitmq:management:鏡像名
注1:RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字請(qǐng)記好,在之后的編程中要用到, 如果啟動(dòng)時(shí)沒(méi)指定,默認(rèn)值為/
4.進(jìn)入RabbitMQ管理平臺(tái)進(jìn)行相關(guān)操作
- 注1:容器啟動(dòng)后,可以通過(guò)docker logs 窗口ID/容器名字 查看日志 docker logs my-rabbitmq
- 注2:停止并刪除所有容器 docker stop $(docker ps -aq) && docker rm $(docker ps -aq)
常用操作命令
命令行與管控臺(tái)-基礎(chǔ)操作
- rabbitmqctl stop_app:關(guān)閉應(yīng)用
- rabbitmqctl start_app:?jiǎn)?dòng)應(yīng)用
- rabbitmqctl status:節(jié)點(diǎn)狀態(tài)
- rabbitmqctl add_user username password:添加用戶
- rabbitmqctl list_users:列出所有用戶
- rabbitmqctl delete_user username:刪除用戶
- rabbitmqctl clear_permissions -p vhostpath username:清除用戶權(quán)限
- rabbitmqctl list_user_permissions username:列出用戶權(quán)限
- rabbitmqctl change_password username newpassword:修改密碼
- rabbitmqctl set_permissions -p vhostpath username “.” “.” “.*”
- rabbitmqctl add_vhost vhostpath:創(chuàng)建虛擬主機(jī)
- rabbitmqctl list_vhosts:列出所有虛擬主機(jī)
- rabbitmqctl list_permissions -p vhostpath:列出虛擬主機(jī)上所有權(quán)限
- rabbitmqctl delete_vhost vhostpath:刪除虛擬主機(jī)
- rabbitmqctl list_queues:查看所有隊(duì)列信息
- rabbitmqctl -p vhostpath purge_queue blue:清除隊(duì)列里的消息
命令行與管控臺(tái)-高級(jí)操作
- rabbitmqctl reset:移除所有數(shù)據(jù),要在rabbitmqctl stop_app之后使用
- rabbitmqctl join_cluster [–ram]:組成集群命令
- rabbitmqctl cluster_status:查看集群狀態(tài)
- rabbitmqctl change_cluster_node_type disc | ram:修改集群節(jié)點(diǎn)的存儲(chǔ)形式
- rabbitmqctl forget_cluster_node {–offline} 忘記節(jié)點(diǎn) (摘除節(jié)點(diǎn))
- rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2…] (修改節(jié)點(diǎn)名稱)
RabbitMQ快速入門(mén)
極速入門(mén)-消息生產(chǎn)與消費(fèi)
- ConnectionFactory:獲取連接工廠
- Connection:一個(gè)鏈接
- Channel:數(shù)據(jù)通信通道,課發(fā)送和接收消息
- Queue:具體的消息存儲(chǔ)隊(duì)列
- Producer & Consumer:生產(chǎn)和消費(fèi)者
創(chuàng)建一個(gè)springboot項(xiàng)目: rabbitmq-api
導(dǎo)入pom依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
消費(fèi)端代碼
package com.xieminglu.rabbitmqapi.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import java.io.IOException; public class Consumer { public static void main(String[] args) throws Exception { //1 創(chuàng)建一個(gè)ConnectionFactory, 并進(jìn)行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通過(guò)連接工廠創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); //3 通過(guò)connection創(chuàng)建一個(gè)Channel Channel channel = connection.createChannel(); //4 聲明(創(chuàng)建)一個(gè)隊(duì)列 String queueName = "test001"; // 參數(shù):隊(duì)列名稱、持久化與否、獨(dú)占與否、無(wú)消息隊(duì)列是否自動(dòng)刪除、消息參數(shù) // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) channel.queueDeclare(queueName, true, false, false, null); //5 創(chuàng)建消費(fèi)者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6 設(shè)置Channel // 參數(shù):隊(duì)列名稱、自動(dòng)簽收、消費(fèi)者回調(diào) // basicConsume(String queue, boolean autoAck, Consumer callback) channel.basicConsume(queueName, true, queueingConsumer); while(true){ //7 獲取消息(Delivery:傳送) QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消費(fèi)端: " + msg); //Envelope envelope = delivery.getEnvelope(); } } }
生產(chǎn)端
package com.xieminglu.rabbitmqapi.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Procuder { public static void main(String[] args) throws Exception { //1 創(chuàng)建一個(gè)ConnectionFactory, 并進(jìn)行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.248.134"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通過(guò)連接工廠創(chuàng)建連接 Connection connection = connectionFactory.newConnection(); //3 通過(guò)connection創(chuàng)建一個(gè)Channel Channel channel = connection.createChannel(); //4 通過(guò)Channel發(fā)送數(shù)據(jù) for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", null, msg.getBytes()); } //5 記得要關(guān)閉相關(guān)的連接 channel.close(); connection.close(); } }
到此這篇關(guān)于Java消息隊(duì)列RabbitMQ入門(mén)詳解的文章就介紹到這了,更多相關(guān)RabbitMQ入門(mén)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Boot?中的?@HystrixCommand?注解原理及使用方法
通過(guò)使用 @HystrixCommand 注解,我們可以輕松地實(shí)現(xiàn)對(duì)方法的隔離和監(jiān)控,從而提高系統(tǒng)的可靠性和穩(wěn)定性,本文介紹了Spring Boot 中的@HystrixCommand注解是什么,其原理以及如何使用,感興趣的朋友跟隨小編一起看看吧2023-07-07jsp、servlet前后端交互對(duì)數(shù)據(jù)處理及展示的簡(jiǎn)單實(shí)現(xiàn)
Servlet和JSP是Java Web開(kāi)發(fā)中的兩個(gè)重要概念,在Servlet和JSP中前后端交互可以通過(guò)一些方式來(lái)實(shí)現(xiàn),這篇文章主要給大家介紹了關(guān)于jsp、servlet前后端交互對(duì)數(shù)據(jù)處理及展示的簡(jiǎn)單實(shí)現(xiàn),需要的朋友可以參考下2023-12-12Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(12)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-07-07Mybatis實(shí)戰(zhàn)之TypeHandler高級(jí)進(jìn)階
本文主要介紹了自定義的枚舉TypeHandler的相關(guān)知識(shí),具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧2017-02-02解決mybatis-plus3.4.1分頁(yè)插件PaginationInterceptor和防止全表更新與刪除插件SqlE
這篇文章給大家介紹了在Spring.xml文件中配置mybatis-plus3.4.1分頁(yè)插件PaginationInterceptor和防止全表更新與刪除插件SqlExplainInterceptor過(guò)時(shí)失效問(wèn)題解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2020-12-12springboot下實(shí)現(xiàn)RedisTemplate?List?清空
我們經(jīng)常會(huì)使用Redis的List數(shù)據(jù)結(jié)構(gòu)來(lái)存儲(chǔ)一系列的元素,當(dāng)我們需要清空一個(gè)List時(shí),可以使用RedisTemplate來(lái)實(shí)現(xiàn),本文就來(lái)詳細(xì)的介紹一下如何實(shí)現(xiàn),感興趣的可以了解一下2024-01-01SpringBoot混合使用StringRedisTemplate和RedisTemplate的坑及解決
這篇文章主要介紹了SpringBoot混合使用StringRedisTemplate和RedisTemplate的坑及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12