欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

原生Java操作兔子隊列RabbitMQ

 更新時間:2023年05月22日 10:33:11   作者:獅子也瘋狂  
這篇文章主要介紹了原生Java操作兔子隊列RabbitMQ,MQ全稱為Message?Queue,即消息隊列,“消息隊列”是在消息的傳輸過程中保存消息的容器,需要的朋友可以參考下

一.前言

RabbitMQ 是一種快速、靈活、可靠的消息傳遞方式,可用于構(gòu)建分布式應(yīng)用程序、異步處理任務(wù)、實現(xiàn)消息隊列等。下面是 Java 原生操作 RabbitMQ 的一些好處和用途:

  1. 簡單易用:RabbitMQ 提供了豐富的 Java 客戶端庫,開發(fā)者可以輕松地使用 Java 代碼進行消息發(fā)送和接收,無需學習復雜的消息傳遞協(xié)議和 API。
  2. 可擴展性強:RabbitMQ 支持集群和分布式部署,可以輕松地實現(xiàn)橫向和縱向擴展,以適應(yīng)不同規(guī)模和負載的應(yīng)用需求。
  3. 可靠性高:RabbitMQ 提供了多種消息傳遞模式,包括持久化消息、確認機制、事務(wù)機制等,確保消息傳遞的可靠性和一致性。
  4. 異步處理能力:RabbitMQ 可以異步處理任務(wù),提高應(yīng)用程序的響應(yīng)速度和吞吐量,實現(xiàn)任務(wù)削峰、應(yīng)對高并發(fā)等需求。
  5. 可用于多種場景:RabbitMQ 可以用于構(gòu)建分布式應(yīng)用程序、實現(xiàn)消息隊列、異步處理任務(wù)、實現(xiàn)實時數(shù)據(jù)同步等場景,具有廣泛的應(yīng)用場景和發(fā)展前景。

二.原生Java操作RabbitMQ

Ⅰ. 簡單模式

1. 添加依賴

    <!--rabbitmq依賴-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.14.0</version>
    </dependency>

2. 編寫生產(chǎn)者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//生產(chǎn)者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.建立信道
        Channel channel = connection.createChannel();
//        4.創(chuàng)建隊列,若隊列已存在則使用該隊列
        /**
         * 參數(shù)1:隊列名
         * 參數(shù)2:是否持久化,true表示MQ重啟后隊列還存在
         * 參數(shù)3:是否私有化,false表示所有消費者都可以訪問,true表示只有第一次訪問她的消費者才能訪問
         * 參數(shù)4:是否自動刪除,true表示不再使用隊列時,自動刪除
         * 參數(shù)5:其他額外參數(shù)
         */
        channel.queueDeclare("simple_queue",false,false,false,null);
//        5.發(fā)送消息
        String message = "hello rabbitmq";
        /**
         * 參數(shù)1:交換機名,""表示默認交換機
         * 參數(shù)2:路由鍵,簡單模式就是隊列名
         * 參數(shù)3:其他額外參數(shù)
         * 參數(shù)4:要傳遞的消息字節(jié)數(shù)組
         */
        channel.basicPublish("","simple_queue",null,message.getBytes());
//        6.關(guān)閉信道和連接
        channel.close();
        connection.close();
        System.out.println("=====發(fā)送成功====");
    }
}

image-20230411151940196

3. 編寫消費者

因為消費者不知道生產(chǎn)者什么時候發(fā)送消息過來,所以消費者需要一直監(jiān)聽生產(chǎn)者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 消費者
 */
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
//        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收消息,消息為:"+message);
            }
        });
    }
}

image-20230411152552799

Ⅱ. 工作隊列模式

image-20230411154500986

與簡單模式相比,工作隊列模式(Work Queue)多了一些消費者,該模式也使用direct交換機,應(yīng)用于處理消息較多的情況。特點如下:

  • 一個隊列對應(yīng)多個消費者。
  • 一條消息只會被一個消費者消費。
  • 消息隊列默認采用輪詢的方式將消息平均發(fā)送給消費者。

其實就是 簡單模式plus版本。

1. 編寫生產(chǎn)者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
//        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.創(chuàng)建隊列,持久化隊列
        channel.queueDeclare("work_queue",true,false,false,null);
//        5.發(fā)送大量消息,參數(shù)3表示該消息為持久化消息,即除了保存到內(nèi)存還保存到磁盤
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
                    ("您好,這是今天的第"+i+"條消息").getBytes(StandardCharsets.UTF_8));
        }
//        6.關(guān)閉資源
        channel.close();
        connection.close();
    }
}

image-20230411155449300

2. 編寫消費者

這里使用創(chuàng)建了三個消費者,來接收生產(chǎn)者的消息

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        監(jiān)聽隊列,處理消息
        channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("消費者1消費消息,消息為:"+message);
            }
        });
    }
}

3. 實現(xiàn)

先把三個消費者運行起來,再運行生產(chǎn)者,得到的消息就會輪詢均分

image-20230411160256568

Ⅲ. 發(fā)布訂閱模式

在開發(fā)過程中,有一些消息需要不同消費者進行不同的處理,如電商網(wǎng)站的同一條促銷信息需要短信發(fā)送、郵件發(fā)送、站內(nèi)信發(fā)送等。此時可以使用發(fā)布訂閱模式(Publish/Subscribe)

特點:

  • 生產(chǎn)者將消息發(fā)送給交換機,交換機將消息轉(zhuǎn)發(fā)到綁定此交換機的每個隊列中。
  • 工作隊列模式的交換機只能將消息發(fā)送給一個隊列,發(fā)布訂閱模式的交換機能將消息發(fā)送給多個隊列。發(fā)布訂閱模式使用fanout交換機。

1. 編寫生產(chǎn)者

這里創(chuàng)建了三條隊列,一條是發(fā)送短信,一條是站內(nèi)信,一條是郵件隊列、

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import javax.swing.plaf.TreeUI;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認的交換機,而是自己創(chuàng)建fanout交換機,生產(chǎn)者把消息發(fā)到交換機,由交換機轉(zhuǎn)發(fā)到與之綁定的隊列
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.建立信道
        Channel channel = connection.createChannel();
//        4.創(chuàng)建交換機
        /**
         * 參數(shù)1:交換機名
         * 參數(shù)2:交換機類型
         * 參數(shù)3:交換機是否持久化
         */
        channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
//        5.創(chuàng)建隊列
        channel.queueDeclare("SEND_MAIL",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
        channel.queueDeclare("SEND_STATION",true,false,false,null);
//        6.交換機綁定隊列
        /**
         * 參數(shù)1:隊列名
         * 參數(shù)2:交換機名
         * 參數(shù)3:路由關(guān)鍵字,發(fā)布訂閱模式只需要寫""即可
         */
        channel.queueBind("SEND_MAIL","exchange_fanout","");
        channel.queueBind("SEND_MESSAGE","exchange_fanout","");
        channel.queueBind("SEND_STATION","exchange_fanout","");
//        7.發(fā)送消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("exchange_fanout","",null,
                    ("您好,尊敬的用戶,秒殺商品活動開始啦:"+i).getBytes(StandardCharsets.UTF_8));
        }
//        8.關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2. 編寫消費者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//短信消費者
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("SEND_MESSAGE",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("發(fā)送短信,消息為:"+message);
            }
        });
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//郵件消費者
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("SEND_MAIL",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("發(fā)送郵件,消息為:"+message);
            }
        });
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//站內(nèi)信消費者
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("SEND_STATION",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("發(fā)送站內(nèi)信,消息為:"+message);
            }
        });
    }
}

發(fā)布訂閱模式也允許多個消費者監(jiān)聽同一個隊列(工作模式),例如 兩個發(fā)送短信消費者監(jiān)聽同一個短信生產(chǎn)者,這樣短信生產(chǎn)者的消息將會被輪詢平分。

Ⅳ. 路由模式

image-20230411173014108

使用發(fā)布訂閱模式時,所有消息都會發(fā)送到綁定的隊列中,但很多時候,不是所有消息都無差別的發(fā)布到所有隊列中。比如電商網(wǎng)站

的促銷活動,雙十一大促可能會發(fā)布到所有隊列;而一些小的促銷活動為了節(jié)約成本,只發(fā)布到站內(nèi)信隊列。此時需要使用路由模式

(Routing)完成這一需求。意思就是只發(fā)給與綁定相同路由關(guān)鍵字的隊列。

特點:

  1. 每個隊列綁定路由關(guān)鍵字RoutingKey。
  2. 生產(chǎn)者將帶有RoutingKey的消息發(fā)送給交換機,交換機根據(jù)RoutingKey轉(zhuǎn)發(fā)到指定隊列。路由模式使用direct交換機。

1. 編寫生產(chǎn)者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認的交換機,而是自己創(chuàng)建fanout交換機,生產(chǎn)者把消息發(fā)到交換機,由交換機轉(zhuǎn)發(fā)到與之綁定的隊列
 */
// 生產(chǎn)者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
        // 2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.創(chuàng)建交換機
        channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
        // 5.創(chuàng)建隊列
        channel.queueDeclare("SEND_MAIL2",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
        channel.queueDeclare("SEND_STATION2",true,false,false,null);
        // 6.交換機綁定隊列
        channel.queueBind("SEND_MAIL2","exchange_routing","import");
        channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
        channel.queueBind("SEND_STATION2","exchange_routing","import");
        channel.queueBind("SEND_STATION2","exchange_routing","normal");
        // 7.發(fā)送消息
        channel.basicPublish("exchange_routing","import",null,
                "雙十一大促活動".getBytes());
        channel.basicPublish("exchange_routing","normal",null,
                "小心促銷活動".getBytes());
        // 8.關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2. 編寫消費者

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//發(fā)送郵件消費者
public class ConsumerMail {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("SEND_MAIL2",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("發(fā)送郵件,消息為:"+message);
            }
        });
    }
}
//發(fā)送信息消費者
public class ConsumerMessage {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("SEND_MESSAGE2",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("發(fā)送短信,消息為:"+message);
            }
        });
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//站內(nèi)信消費者
public class ConsumerStation {
    public static void main(String[] args) throws IOException, TimeoutException {
        //        1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
//        2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
//        3.創(chuàng)建信道
        Channel channel = connection.createChannel();
//        4.監(jiān)聽隊列
        /**
         * 參數(shù)1:監(jiān)聽的隊列名
         * 參數(shù)2:是否自動簽收,如果為false,則需要手動確認消息已收到,否則MQ會一直發(fā)送消息。
         * 參數(shù)3:Consumer的實現(xiàn)類,重寫該類方法表示接收到這個消息之后該如何消費消息
         */
        channel.basicConsume("SEND_STATION2",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("發(fā)送站內(nèi)信,消息為:"+message);
            }
        });
    }
}

Ⅴ. 通配符模式

image-20230411204903551

通配符模式(Topic)是在路由模式的基礎(chǔ)上,給隊列綁定帶通配符的路由關(guān)鍵字,只要消息的RoutingKey能實現(xiàn)通配符匹配,就會將消

息轉(zhuǎn)發(fā)到該隊列。通配符模式比路由模式更靈活,使用topic交換機。

通配符規(guī)則:

1 消息設(shè)置RoutingKey時,RoutingKey由多個單詞構(gòu)成,中間以 . 分割。

2 隊列設(shè)置RoutingKey時, # 可以匹配任意多個單詞, * 可以匹配任意一個單詞。

1. 編寫生產(chǎn)者

image-20230411211252594

package com.itbz.mq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 發(fā)布訂閱者模式跟簡單和工作模式不一樣,不是使用默認的交換機,而是自己創(chuàng)建fanout交換機,生產(chǎn)者把消息發(fā)到交換機,由交換機轉(zhuǎn)發(fā)到與之綁定的隊列
 */
// 生產(chǎn)者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.創(chuàng)建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.79.50.65");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("lion");
        connectionFactory.setPassword("lion");
        connectionFactory.setVirtualHost("/");
        // 2.創(chuàng)建連接
        Connection connection = connectionFactory.newConnection();
        // 3.建立信道
        Channel channel = connection.createChannel();
        // 4.創(chuàng)建交換機
        channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
        // 5.創(chuàng)建隊列
        channel.queueDeclare("SEND_MAIL3",true,false,false,null);
        channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
        channel.queueDeclare("SEND_STATION3",true,false,false,null);
        // 6.交換機綁定隊列
        channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
        channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
        channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
        // 7.發(fā)送消息
            // 三個隊列都匹配上了
        channel.basicPublish("exchange_topic","mail.message.station",null,
                "雙十一大促活動".getBytes());
            // 只發(fā)給station
        channel.basicPublish("exchange_topic","station",null,
                "小心促銷活動".getBytes());
        // 8.關(guān)閉資源
        channel.close();
        connection.close();
    }
}

2. 編寫消費者

跟前面差不多

三.總結(jié)

這篇萬字長文總結(jié)了原生Java操作RabbitMQ的各種過程,希望對您有幫助哦!

到此這篇關(guān)于原生Java操作兔子隊列RabbitMQ的文章就介紹到這了,更多相關(guān)原生Java操作RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Netty分布式ByteBuf中PooledByteBufAllocator剖析

    Netty分布式ByteBuf中PooledByteBufAllocator剖析

    這篇文章主要為大家介紹了Netty分布式ByteBuf剖析PooledByteBufAllocator簡述,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-03-03
  • JsonFormat與@DateTimeFormat注解實例解析

    JsonFormat與@DateTimeFormat注解實例解析

    這篇文章主要介紹了JsonFormat與@DateTimeFormat注解實例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2019-12-12
  • java生成壓縮文件示例代碼

    java生成壓縮文件示例代碼

    在工作過程中,需要將一個文件夾生成壓縮文件,然后提供給用戶下載。寫了一個壓縮文件的工具類。該工具類支持單個文件和文件夾壓縮
    2013-11-11
  • java中的GC收集器詳情

    java中的GC收集器詳情

    這篇文章主要介紹了java中的GC收集器,GC(Garbage collection )指的是程序內(nèi)存管理分手動和自動,手動內(nèi)存管理,需要我們編程的時候顯式分配和釋放空間,但如果忘記釋放,會造成嚴重的內(nèi)存泄漏問題,下面文章內(nèi)容我們就來實例說明情況,需要的朋友可以參考一下
    2021-10-10
  • SpringCloud修改Feign日志記錄級別過程淺析

    SpringCloud修改Feign日志記錄級別過程淺析

    OpenFeign源于Netflix的Feign,是http通信的客戶端。屏蔽了網(wǎng)絡(luò)通信的細節(jié),直接面向接口的方式開發(fā),讓開發(fā)者感知不到網(wǎng)絡(luò)通信細節(jié)。所有遠程調(diào)用,都像調(diào)用本地方法一樣完成
    2023-02-02
  • 解決fastjson泛型轉(zhuǎn)換報錯的解決方法

    解決fastjson泛型轉(zhuǎn)換報錯的解決方法

    這篇文章主要介紹了解決fastjson泛型轉(zhuǎn)換報錯的解決方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11
  • 談?wù)凧ava利用原始HttpURLConnection發(fā)送POST數(shù)據(jù)

    談?wù)凧ava利用原始HttpURLConnection發(fā)送POST數(shù)據(jù)

    這篇文章主要給大家介紹java利用原始httpUrlConnection發(fā)送post數(shù)據(jù),設(shè)計到httpUrlConnection類的相關(guān)知識,感興趣的朋友跟著小編一起學習吧
    2015-10-10
  • 全網(wǎng)最新Log4j?漏洞修復和臨時補救方法

    全網(wǎng)最新Log4j?漏洞修復和臨時補救方法

    Apache?Log4j?遠程代碼執(zhí)行漏洞,如何快速修復log4j2漏洞,本文給大家介紹下Log4j?漏洞修復和臨時補救方法,感興趣的朋友跟隨小編一起看看吧
    2021-12-12
  • 使用Feign調(diào)用時添加驗證信息token到請求頭方式

    使用Feign調(diào)用時添加驗證信息token到請求頭方式

    這篇文章主要介紹了使用Feign調(diào)用時添加驗證信息token到請求頭方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • java中Swing會奔跑的線程俠

    java中Swing會奔跑的線程俠

    本文通過代碼示例給大家詳細講解了java中Swing會奔跑的線程俠這個經(jīng)典的示例,有興趣的朋友學習下。
    2018-03-03

最新評論