欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Docker啟動(dòng)RabbitMQ實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的詳細(xì)過(guò)程

 更新時(shí)間:2023年02月23日 10:08:26   作者:zoeil  
這篇文章主要介紹了Docker啟動(dòng)RabbitMQ,實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者,通過(guò)Docker拉取鏡像并啟動(dòng)RabbitMQ,本文結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

一、Docker拉取鏡像并啟動(dòng)RabbitMQ

拉取鏡像

docker pull rabbitmq:3.8.8-management

查看鏡像

docker images rabbitmq

 啟動(dòng)鏡像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虛擬機(jī)記得開(kāi)放5672端口或者關(guān)閉防火墻,在window通過(guò) 主機(jī)ip:15672 訪問(wèn)rabbitmq控制臺(tái)

 用戶名密碼默認(rèn)為guest

二、Hello World

(一)依賴導(dǎo)入

<!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依賴客戶端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個(gè)依賴-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生產(chǎn)者

工作原理

  • Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker
  • Connection:publisher/consumer 和 broker 之間的 TCP 連接
  • Channel:如果每一次訪問(wèn) RabbitMQ 都建立一個(gè) Connection,在消息量大的時(shí)候建立 TCP Connection 的開(kāi)銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個(gè) thread 創(chuàng)建單獨(dú)的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識(shí)別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級(jí)的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開(kāi)銷
  • Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最終被送到這里等待 consumer 取走

我們需要先獲取連接(Connection),然后通過(guò)連接獲取信道(Channel),這里我們演示簡(jiǎn)單例子,可以直接跳過(guò)交換機(jī)(Exchange)發(fā)送隊(duì)列(Queue)

public class Producer {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一個(gè)隊(duì)列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
         * 1.隊(duì)列名稱
         * 2.隊(duì)列里面的消息是否持久化 默認(rèn)消息存儲(chǔ)在內(nèi)存中
         * 3.該隊(duì)列是否只供一個(gè)消費(fèi)者進(jìn)行消費(fèi) 是否進(jìn)行共享 true 可以多個(gè)消費(fèi)者消費(fèi)
         * 4.是否自動(dòng)刪除 最后一個(gè)消費(fèi)者端開(kāi)連接以后 該隊(duì)列是否自動(dòng)刪除 true 自動(dòng)刪除
         * 5.其他參數(shù)
         **/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        /*
         * 發(fā)送一個(gè)消息
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.發(fā)送到哪個(gè)交換機(jī)
         * 2.路由的key是哪個(gè)
         * 3.其他的參數(shù)信息
         * 4.發(fā)送消息的消息體
         *
         **/
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("發(fā)送成功");
    }
}

(三)消息消費(fèi)者

public class Consumer {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個(gè)連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機(jī)ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實(shí)現(xiàn)了自動(dòng) close 接口 自動(dòng)關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
 
        // 推送的消息如何進(jìn)行消費(fèi)的回調(diào)接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消費(fèi)的一個(gè)回調(diào)接口,如在消費(fèi)的時(shí)候隊(duì)列被刪除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消費(fèi)被中斷");
        };
        /*
         * 消費(fèi)者消費(fèi)消息
         * basicConsume(String queue, boolean autoAck, 
         * DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * 1.消費(fèi)哪個(gè)隊(duì)列
         * 2.消費(fèi)成功之后是否要自動(dòng)應(yīng)答 true 代表自動(dòng)應(yīng)答 false 手動(dòng)應(yīng)答
         * 3.消費(fèi)者未成功消費(fèi)的回調(diào)
         **/
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

三、實(shí)現(xiàn)輪訓(xùn)分發(fā)消息

(一)抽取工具類

可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個(gè)工具類來(lái)調(diào)用,并使用單例模式-餓漢式完成信道的初始化

public class RabbitMqUtils {
 
    private static Channel channel;
 
    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置ip地址
        factory.setHost("192.168.23.100");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        try {
            // 創(chuàng)建連接
            Connection connection = factory.newConnection();
            // 獲取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("創(chuàng)建信道失敗,錯(cuò)誤信息:" + e.getMessage());
        }
    }
 
    public static Channel getChannel() {
        return channel;
    }
}

(二)啟動(dòng)兩個(gè)工作線程

相當(dāng)于前面的消費(fèi)者,我們只需要寫一個(gè)類,通過(guò)ideal實(shí)現(xiàn)多線程啟動(dòng)即可模擬兩個(gè)線程

public class Worker01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        // 啟動(dòng)兩次,第一次為C1, 第二次為C2
        System.out.println("C2消費(fèi)者等待消費(fèi)消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)啟動(dòng)發(fā)送線程

public class Test01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通過(guò)控制臺(tái)輸入充當(dāng)消息,使輪訓(xùn)演示更明顯
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

結(jié)果 

四、實(shí)現(xiàn)手動(dòng)應(yīng)答

(一)消息應(yīng)答概念

消費(fèi)者完成一個(gè)任務(wù)可能需要一段時(shí)間,如果其中一個(gè)消費(fèi)者處理一個(gè)長(zhǎng)的任務(wù)并僅只完成 了部分突然它掛掉了,會(huì)發(fā)生什么情況。RabbitMQ 一旦向消費(fèi)者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個(gè)消費(fèi)者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費(fèi)這的消息,因?yàn)樗鼰o(wú)法接收到。 為了保證消息在發(fā)送過(guò)程中不丟失,rabbitmq 引入消息應(yīng)答機(jī)制,消息應(yīng)答就是: 消費(fèi)者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。

自動(dòng)應(yīng)答:消費(fèi)者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán),因?yàn)檫@種模式如果消息在接收到之前,消費(fèi)者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。

當(dāng)然另一方面這種模式消費(fèi)者那邊可以傳遞過(guò)載的消息, 沒(méi)有對(duì)傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費(fèi)者這邊由于接收太多還來(lái)不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費(fèi)者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費(fèi)者可以高效并 以某種速率能夠處理這些消息的情況下使用 。

手動(dòng)應(yīng)答:消費(fèi)者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除

(二)消息應(yīng)答的方法

  • Channel.basicAck(用于肯定確認(rèn))
  • RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
  • Channel.basicNack(用于否定確認(rèn))
  • Channel.basicReject(用于否定確認(rèn))
  • 與 Channel.basicNack 相比少一個(gè)參數(shù)Multiple
  • multiple 的 true 和 false 代表不同意思

        true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
        比如說(shuō) channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時(shí)
        5-8 的這些還未應(yīng)答的消息都會(huì)被確認(rèn)收到消息應(yīng)答
        false 同上面相比
        只會(huì)應(yīng)答 tag=8 的消息 5,6,7 這三個(gè)消息依然不會(huì)被確認(rèn)收到消息應(yīng)答

  • 不處理該消息了直接拒絕,可以將其丟棄了

(三)消息自動(dòng)重新入隊(duì) 

如果消費(fèi)者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對(duì)其重新排隊(duì)。如果此時(shí)其他消費(fèi)者可以處理,它將很快將其重新分發(fā)給另一個(gè)消費(fèi)者。這樣,即使某個(gè)消費(fèi)者偶爾死亡,也可以確 保不會(huì)丟失任何消息。

(四)消息手動(dòng)應(yīng)答代碼 

1、生產(chǎn)者

public class Test01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

2、睡眠工具類模擬業(yè)務(wù)執(zhí)行

public class SleepUtils {
 
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消費(fèi)者

public class Worker01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C1,業(yè)務(wù)時(shí)間短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模擬業(yè)務(wù)執(zhí)行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識(shí)
             * 2、是否啟動(dòng)批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
             *    時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}
 
==============================================================================
public class Worker02 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C2,業(yè)務(wù)時(shí)間長(zhǎng)");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模擬業(yè)務(wù)執(zhí)行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識(shí)
             * 2、是否啟動(dòng)批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費(fèi)的消息提前被確然刪除,后面業(yè)務(wù)消費(fèi)該消息
             *    時(shí)出現(xiàn)異常會(huì)導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費(fèi)者取消消費(fèi)接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01業(yè)務(wù)時(shí)間短,worker02業(yè)務(wù)時(shí)間長(zhǎng),我們提前終止worker02模擬出異常,可以看到消息dd會(huì)被放回隊(duì)列由worker01接收處理。

注意:這里需要先啟動(dòng)生產(chǎn)者聲明隊(duì)列ack,不然啟動(dòng)消費(fèi)者會(huì)報(bào)錯(cuò)

最后一個(gè)案例我們可以看到消息輪訓(xùn)+消息自動(dòng)重新入隊(duì)+手動(dòng)應(yīng)答。

到此這篇關(guān)于Docker啟動(dòng)RabbitMQ,實(shí)現(xiàn)生產(chǎn)者與消費(fèi)者的文章就介紹到這了,更多相關(guān)Docker啟動(dòng)RabbitMQ內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何使用Docker和cpolar在Linux服務(wù)器上搭建DashDot監(jiān)控面板

    如何使用Docker和cpolar在Linux服務(wù)器上搭建DashDot監(jiān)控面板

    本文主要介紹如何在Linux服務(wù)器上使用Docker和cpolar技術(shù)搭建DashDot監(jiān)控面板,實(shí)現(xiàn)實(shí)時(shí)服務(wù)器監(jiān)控,DashDot提供直觀的監(jiān)控界面和豐富的指標(biāo),通過(guò)cpolar可以實(shí)現(xiàn)公網(wǎng)訪問(wèn),方便用戶隨時(shí)了解服務(wù)器狀態(tài),文章詳細(xì)說(shuō)明了環(huán)境準(zhǔn)備、安裝Docker、配置DashDot和cpolar的步驟
    2024-09-09
  • 阿里云鏡像安裝docker報(bào)錯(cuò)的問(wèn)題及解決方案

    阿里云鏡像安裝docker報(bào)錯(cuò)的問(wèn)題及解決方案

    這篇文章主要介紹了阿里云鏡像安裝docker報(bào)錯(cuò)的問(wèn)題及解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-08-08
  • docker創(chuàng)建redis鏡像的方法

    docker創(chuàng)建redis鏡像的方法

    本篇文章主要介紹了docker創(chuàng)建redis鏡像的方法,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • 淺談Windows平臺(tái)上Docker安裝與使用

    淺談Windows平臺(tái)上Docker安裝與使用

    本篇文章主要介紹了淺談Windows平臺(tái)上Docker安裝與使用,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-12-12
  • docker-compose部署zk+kafka+storm集群的實(shí)現(xiàn)

    docker-compose部署zk+kafka+storm集群的實(shí)現(xiàn)

    這篇文章主要介紹了docker-compose部署zk+kafka+storm集群,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-10-10
  • linux 詳解useradd 命令基本用法

    linux 詳解useradd 命令基本用法

    這篇文章主要介紹了linux 詳解useradd 命令基本用法的相關(guān)資料,需要的朋友可以參考下
    2017-01-01
  • docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法

    docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法

    這篇文章主要介紹了docker 安裝、升級(jí)、修改數(shù)據(jù)目錄的操作方法,本文給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧
    2024-08-08
  • 第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解

    第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解

    今天小編就為大家分享一篇關(guān)于第一次構(gòu)建、運(yùn)行、發(fā)布、獲取docker鏡像的步驟詳解,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-03-03
  • MySQL容器中docker-entrypoint-initdb.d目錄的使用

    MySQL容器中docker-entrypoint-initdb.d目錄的使用

    這篇文章主要介紹了MySQL容器中docker-entrypoint-initdb.d目錄的使用方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • docker-compose的安裝和使用詳解

    docker-compose的安裝和使用詳解

    這篇文章主要介紹了docker-compose的安裝和使用詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11

最新評(píng)論