docker啟動(dòng)rabbitmq以及使用方式詳解
搜索rabbitmq鏡像
docker search rabbitmq:management

下載鏡像
docker pull rabbitmq:management

啟動(dòng)容器
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management


打印容器
docker logs rabbitmq


訪問(wèn)RabbitMQ Management
http://localhost:15672
賬戶密碼默認(rèn):guest

編寫生產(chǎn)者類
package com.xun.rabbitmqdemo.example;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 生成一個(gè)queue隊(duì)列
* 1、隊(duì)列名稱 QUEUE_NAME
* 2、隊(duì)列里面的消息是否持久化(默認(rèn)消息存儲(chǔ)在內(nèi)存中)
* 3、該隊(duì)列是否只供一個(gè)Consumer消費(fèi) 是否共享 設(shè)置為true可以多個(gè)消費(fèi)者消費(fèi)
* 4、是否自動(dòng)刪除 最后一個(gè)消費(fèi)者斷開連接后 該隊(duì)列是否自動(dòng)刪除
* 5、其他參數(shù)
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "Hello world!";
/**
* 發(fā)送一個(gè)消息
* 1、發(fā)送到哪個(gè)exchange交換機(jī)
* 2、路由的key
* 3、其他的參數(shù)信息
* 4、消息體
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" [x] Sent '"+message+"'");
channel.close();
connection.close();
}
}
運(yùn)行該方法,可以看到控制臺(tái)的打印

name=hello的隊(duì)列收到Message

消費(fèi)者
package com.xun.rabbitmqdemo.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setConnectionTimeout(600000);//milliseconds
factory.setRequestedHeartbeat(60);//seconds
factory.setHandshakeTimeout(6000);//milliseconds
factory.setRequestedChannelMax(5);
factory.setNetworkRecoveryInterval(500);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("Waiting for messages. ");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}


工作隊(duì)列
RabbitMqUtils工具類
package com.xun.rabbitmqdemo.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
啟動(dòng)2個(gè)工作線程
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
public class Work01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi)....");
/**
* 消費(fèi)者消費(fèi)消息
* 1、消費(fèi)哪個(gè)隊(duì)列
* 2、消費(fèi)成功后是否自動(dòng)應(yīng)答
* 3、消費(fèi)的接口回調(diào)
* 4、消費(fèi)未成功的接口回調(diào)
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
public class Work02 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
System.out.println("C2 消費(fèi)者啟動(dòng)等待消費(fèi)....");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
啟動(dòng)工作線程

啟動(dòng)發(fā)送線程
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
try(Channel channel= RabbitMqUtils.getChannel();){
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//從控制臺(tái)接收消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發(fā)送消息完成:"+message);
}
}
}
}
啟動(dòng)發(fā)送線程,此時(shí)發(fā)送線程等待鍵盤輸入

發(fā)送4個(gè)消息



可以看到2個(gè)工作線程按照順序分別接收message。
消息應(yīng)答機(jī)制
rabbitmq將message發(fā)送給消費(fèi)者后,就會(huì)將該消息標(biāo)記為刪除。
但消費(fèi)者在處理message過(guò)程中宕機(jī),會(huì)導(dǎo)致消息的丟失。
因此需要設(shè)置手動(dòng)應(yīng)答。
生產(chǎn)者
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
try(Channel channel = RabbitMqUtils.getChannel()){
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("請(qǐng)輸入信息");
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
System.out.println("生產(chǎn)者task02發(fā)出消息"+ message);
}
}
}
}
消費(fèi)者
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;
public class Work03 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Work03 等待接收消息處理時(shí)間較短");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
/**
* 1、消息的標(biāo)記tag
* 2、是否批量應(yīng)答
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
//采用手動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;
public class Work04 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Work04 等待接收消息處理時(shí)間較長(zhǎng)");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
};
//采用手動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
工具類SleepUtils
package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
public static void sleep(int second){
try{
Thread.sleep(1000*second);
}catch (InterruptedException _ignored){
Thread.currentThread().interrupt();
}
}
}
模擬



work04等待30s后發(fā)出ack

在work04處理message時(shí)手動(dòng)停止線程,可以看到message:dd被rabbitmq交給了work03



不公平分發(fā)
上面的輪詢分發(fā),生產(chǎn)者依次向消費(fèi)者按順序發(fā)送消息,但當(dāng)消費(fèi)者A處理速度很快,而消費(fèi)者B處理速度很慢時(shí),這種分發(fā)策略顯然是不合理的。
不公平分發(fā):
int prefetchCount = 1; channel.basicQos(prefetchCount);
通過(guò)此配置,當(dāng)消費(fèi)者未處理完當(dāng)前消息,rabbitmq會(huì)優(yōu)先將該message分發(fā)給空閑消費(fèi)者。

總結(jié)
到此這篇關(guān)于docker啟動(dòng)rabbitmq以及使用的文章就介紹到這了,更多相關(guān)docker啟動(dòng)rabbitmq及使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
docker-compose啟動(dòng)docker文件掛載失敗的解決
這篇文章主要介紹了docker-compose啟動(dòng)docker文件掛載失敗的解決方案。具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
Mac上使用Docker如何快速啟動(dòng)MySQL測(cè)試
本文主要討論如何使用Docker快速啟動(dòng) MySQL 測(cè)試,包括Mac環(huán)境。非常不錯(cuò),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧2016-10-10
DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案
這篇文章主要介紹了DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-02-02
解決docker中mysql時(shí)間與系統(tǒng)時(shí)間不一致問(wèn)題
最近在Docker中裝mysql時(shí),發(fā)現(xiàn)數(shù)據(jù)庫(kù)時(shí)間與系統(tǒng)時(shí)間相差8個(gè)小時(shí)。查詢資料發(fā)現(xiàn),docker的默認(rèn)時(shí)區(qū)是0區(qū),其實(shí)這會(huì)對(duì)安裝的容器造成不少麻煩,比如執(zhí)行日志的記錄不準(zhǔn)確等2021-12-12
docker網(wǎng)絡(luò),docker-compose?network問(wèn)題
這篇文章主要介紹了docker網(wǎng)絡(luò),docker-compose?network問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
docker環(huán)境搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺(tái)的教程
這篇文章主要介紹了docker下搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺(tái),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-07-07
docker使用Dockerfile構(gòu)建鏡像的實(shí)現(xiàn)示例
本文主要介紹了docker使用Dockerfile構(gòu)建鏡像的實(shí)現(xiàn)示例,通過(guò)編寫 Dockerfile,您可以定義鏡像的基礎(chǔ)環(huán)境、安裝軟件包、復(fù)制文件、設(shè)置環(huán)境變量等操作,下面就來(lái)介紹一下2024-01-01
Docker網(wǎng)絡(luò)配置及部署SpringCloud項(xiàng)目詳解
bridge模式是Docker默認(rèn)的網(wǎng)絡(luò)設(shè)置,此模式會(huì)為每一個(gè)容器分配Network Namespace、設(shè)置IP等,并將一個(gè)主機(jī)上的Docker容器連接到一個(gè)虛擬網(wǎng)橋上,下面這篇文章主要給大家介紹了關(guān)于Docker網(wǎng)絡(luò)配置及部署SpringCloud項(xiàng)目的相關(guān)資料,需要的朋友可以參考下2023-01-01

