欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

docker啟動(dòng)rabbitmq以及使用方式詳解

 更新時(shí)間:2022年08月04日 11:34:00   作者:Maackia  
RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的消息隊(duì)列,下面這篇文章主要給大家介紹了關(guān)于docker啟動(dòng)rabbitmq以及使用的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下

搜索rabbitmq鏡像

docker search rabbitmq:management

下載鏡像

docker pull rabbitmq:management

啟動(dòng)容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

打印容器

docker logs rabbitmq

訪問(wèn)RabbitMQ Management

http://localhost:15672

賬戶密碼默認(rèn):guest

編寫(xiě)生產(chǎn)者類

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 生成一個(gè)queue隊(duì)列
         * 1、隊(duì)列名稱 QUEUE_NAME
         * 2、隊(duì)列里面的消息是否持久化(默認(rèn)消息存儲(chǔ)在內(nèi)存中)
         * 3、該隊(duì)列是否只供一個(gè)Consumer消費(fèi) 是否共享 設(shè)置為true可以多個(gè)消費(fèi)者消費(fèi)
         * 4、是否自動(dòng)刪除 最后一個(gè)消費(fèi)者斷開(kāi)連接后 該隊(duì)列是否自動(dòng)刪除
         * 5、其他參數(shù)
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "Hello world!";
        /**
         * 發(fā)送一個(gè)消息
         * 1、發(fā)送到哪個(gè)exchange交換機(jī)
         * 2、路由的key
         * 3、其他的參數(shù)信息
         * 4、消息體
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

運(yùn)行該方法,可以看到控制臺(tái)的打印

name=hello的隊(duì)列收到Message

消費(fèi)者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Waiting for messages. ");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

工作隊(duì)列

RabbitMqUtils工具類

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

啟動(dòng)2個(gè)工作線程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        System.out.println("C1 消費(fèi)者啟動(dòng)等待消費(fèi)....");
        /**
         * 消費(fèi)者消費(fèi)消息
         * 1、消費(fèi)哪個(gè)隊(duì)列
         * 2、消費(fèi)成功后是否自動(dòng)應(yīng)答
         * 3、消費(fèi)的接口回調(diào)
         * 4、消費(fèi)未成功的接口回調(diào)
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        System.out.println("C2 消費(fèi)者啟動(dòng)等待消費(fèi)....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

啟動(dòng)工作線程

啟動(dòng)發(fā)送線程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        try(Channel channel= RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //從控制臺(tái)接收消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("發(fā)送消息完成:"+message);
            }
        }
    }
}

啟動(dòng)發(fā)送線程,此時(shí)發(fā)送線程等待鍵盤(pán)輸入

發(fā)送4個(gè)消息

可以看到2個(gè)工作線程按照順序分別接收message。

消息應(yīng)答機(jī)制

rabbitmq將message發(fā)送給消費(fèi)者后,就會(huì)將該消息標(biāo)記為刪除。

但消費(fèi)者在處理message過(guò)程中宕機(jī),會(huì)導(dǎo)致消息的丟失。

因此需要設(shè)置手動(dòng)應(yīng)答。

生產(chǎn)者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        try(Channel channel = RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            System.out.println("請(qǐng)輸入信息");
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                System.out.println("生產(chǎn)者task02發(fā)出消息"+ message);
            }
        }
    }
}

消費(fèi)者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work03 等待接收消息處理時(shí)間較短");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1、消息的標(biāo)記tag
             * 2、是否批量應(yīng)答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work04 等待接收消息處理時(shí)間較長(zhǎng)");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        //采用手動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

工具類SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException _ignored){
            Thread.currentThread().interrupt();
        }
    }
}

模擬

work04等待30s后發(fā)出ack

在work04處理message時(shí)手動(dòng)停止線程,可以看到message:dd被rabbitmq交給了work03

不公平分發(fā)

上面的輪詢分發(fā),生產(chǎn)者依次向消費(fèi)者按順序發(fā)送消息,但當(dāng)消費(fèi)者A處理速度很快,而消費(fèi)者B處理速度很慢時(shí),這種分發(fā)策略顯然是不合理的。
不公平分發(fā):

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通過(guò)此配置,當(dāng)消費(fèi)者未處理完當(dāng)前消息,rabbitmq會(huì)優(yōu)先將該message分發(fā)給空閑消費(fèi)者。

總結(jié) 

到此這篇關(guān)于docker啟動(dòng)rabbitmq以及使用的文章就介紹到這了,更多相關(guān)docker啟動(dòng)rabbitmq及使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • docker-compose啟動(dòng)docker文件掛載失敗的解決

    docker-compose啟動(dòng)docker文件掛載失敗的解決

    這篇文章主要介紹了docker-compose啟動(dòng)docker文件掛載失敗的解決方案。具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Mac上使用Docker如何快速啟動(dòng)MySQL測(cè)試

    Mac上使用Docker如何快速啟動(dòng)MySQL測(cè)試

    本文主要討論如何使用Docker快速啟動(dòng) MySQL 測(cè)試,包括Mac環(huán)境。非常不錯(cuò),具有參考借鑒價(jià)值,感興趣的朋友一起看看吧
    2016-10-10
  • docker容器使用GPU方法實(shí)現(xiàn)

    docker容器使用GPU方法實(shí)現(xiàn)

    本文主要介紹了docker容器使用GPU方法實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案

    DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案

    這篇文章主要介紹了DockerCE之執(zhí)行docker info出現(xiàn)兩條警告信息及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-02-02
  • Docker容器進(jìn)入的4種方式(小結(jié))

    Docker容器進(jìn)入的4種方式(小結(jié))

    本文主要介紹了Docker容器進(jìn)入的4種方式(小結(jié)),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • 解決docker中mysql時(shí)間與系統(tǒng)時(shí)間不一致問(wèn)題

    解決docker中mysql時(shí)間與系統(tǒng)時(shí)間不一致問(wèn)題

    最近在Docker中裝mysql時(shí),發(fā)現(xiàn)數(shù)據(jù)庫(kù)時(shí)間與系統(tǒng)時(shí)間相差8個(gè)小時(shí)。查詢資料發(fā)現(xiàn),docker的默認(rèn)時(shí)區(qū)是0區(qū),其實(shí)這會(huì)對(duì)安裝的容器造成不少麻煩,比如執(zhí)行日志的記錄不準(zhǔn)確等
    2021-12-12
  • docker網(wǎng)絡(luò),docker-compose?network問(wèn)題

    docker網(wǎng)絡(luò),docker-compose?network問(wèn)題

    這篇文章主要介紹了docker網(wǎng)絡(luò),docker-compose?network問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • docker環(huán)境搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺(tái)的教程

    docker環(huán)境搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺(tái)的教程

    這篇文章主要介紹了docker下搭建JMeter+Grafana+influxdb可視化性能監(jiān)控平臺(tái),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-07-07
  • docker使用Dockerfile構(gòu)建鏡像的實(shí)現(xiàn)示例

    docker使用Dockerfile構(gòu)建鏡像的實(shí)現(xiàn)示例

    本文主要介紹了docker使用Dockerfile構(gòu)建鏡像的實(shí)現(xiàn)示例,通過(guò)編寫(xiě) Dockerfile,您可以定義鏡像的基礎(chǔ)環(huán)境、安裝軟件包、復(fù)制文件、設(shè)置環(huán)境變量等操作,下面就來(lái)介紹一下
    2024-01-01
  • Docker網(wǎng)絡(luò)配置及部署SpringCloud項(xiàng)目詳解

    Docker網(wǎng)絡(luò)配置及部署SpringCloud項(xiàng)目詳解

    bridge模式是Docker默認(rèn)的網(wǎng)絡(luò)設(shè)置,此模式會(huì)為每一個(gè)容器分配Network Namespace、設(shè)置IP等,并將一個(gè)主機(jī)上的Docker容器連接到一個(gè)虛擬網(wǎng)橋上,下面這篇文章主要給大家介紹了關(guān)于Docker網(wǎng)絡(luò)配置及部署SpringCloud項(xiàng)目的相關(guān)資料,需要的朋友可以參考下
    2023-01-01

最新評(píng)論