欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

關(guān)于Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)的7種通訊方式

 更新時(shí)間:2023年05月20日 10:14:10   作者:叫我二蛋  
這篇文章主要介紹了關(guān)于Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)的7種通訊方式,消息中間件是基于隊(duì)列與消息傳遞技術(shù),在網(wǎng)絡(luò)環(huán)境中為應(yīng)用系統(tǒng)提供同步或異步、可靠的消息傳輸?shù)闹涡攒浖到y(tǒng),需要的朋友可以參考下

環(huán)境說(shuō)明

  • RabbitMQ環(huán)境
  • Java版本:JDK1.8
  • Maven版本:apache-maven-3.6.3
  • 開發(fā)工具:IntelliJ IDEA

工程搭建

  1. 創(chuàng)建maven項(xiàng)目
  2. pom.xml文件引入RabbitMQ依賴
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.16.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

連接RabbitMQ

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConnections {
    public static final String RABBITMQ_HOST = "127.0.0.1";
    public static final int RABBITMQ_PORT = 5672;
    public static final String RABBITMQ_USERNAME = "guest";
    public static final String RABBITMQ_PASSWORD = "guest";
    public static final String RABBITMQ_VIRTUAL_HOST = "/";
    /**
     * 構(gòu)建RabbitMQ連接對(duì)象
     *
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.創(chuàng)建Connection工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置Rabbitmq連接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
        //3.返回連接對(duì)象
        return factory.newConnection();
    }
}

通訊模式

1.簡(jiǎn)單通訊

即一個(gè)生產(chǎn)者可以向一個(gè)隊(duì)列發(fā)送消息,一個(gè)消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。如下圖:

在這里插入圖片描述

    public final static String HELLO_QUEUE_NAME = "hello";
    @Test
    public void publish_hello() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null);
        //4.發(fā)布消息
        String msg = "hello,world";
        channel.basicPublish("", HELLO_QUEUE_NAME, null, msg.getBytes());
    }
	@Test
    public void consume_hello() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(HELLO_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received '" + message + "'");
        };
        channel.basicConsume(HELLO_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

2.工作隊(duì)列通訊

與簡(jiǎn)單通訊一樣,當(dāng)消費(fèi)能力不足或想要提高吞吐時(shí)可添加多個(gè)消費(fèi)者進(jìn)行處理業(yè)務(wù)。如下圖,隊(duì)列中的消息會(huì)逐條被C1和C2消費(fèi)。

在這里插入圖片描述

public final static String WORK_QUEUE_NAME = "work";
    @Test
    public void publish_work_queue() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        //4.發(fā)布消息
        String msg = "hello,work queue";
        channel.basicPublish("", WORK_QUEUE_NAME, null, msg.getBytes());
    }
        @Test
    public void consume_work_queue1() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume1 Received '" + message + "'");
        };
        channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }
    @Test
    public void consume_work_queue2() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(WORK_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume2 Received '" + message + "'");
        };
        channel.basicConsume(WORK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

3.發(fā)布/訂閱通訊

工作隊(duì)列背后的假設(shè)是,每個(gè)任務(wù)只交付給一個(gè)消費(fèi)者做同一件事。如果要交付給多個(gè)消費(fèi)者做不同的事,需要引入交換機(jī)實(shí)現(xiàn)一個(gè)完整的消息傳遞模型,這種模式被稱為“發(fā)布/訂閱”。如下圖,消息會(huì)發(fā)布到交換機(jī)中,交換機(jī)向綁定的隊(duì)列同時(shí)發(fā)送消息,最終C1和C2會(huì)同時(shí)消費(fèi)此條消息。

在這里插入圖片描述

public final static String PUB_EXCHANGE_NAME = "pub-ex";
    public final static String PUB1_QUEUE_NAME = "pub-que1";
    public final static String PUB2_QUEUE_NAME = "pub-que2";
    @Test
    public void publish_pub_sub() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建交換機(jī)
        channel.exchangeDeclare(PUB_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //4.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null);
        //5.綁定隊(duì)列
        channel.queueBind(PUB1_QUEUE_NAME, PUB_EXCHANGE_NAME, "");
        channel.queueBind(PUB2_QUEUE_NAME, PUB_EXCHANGE_NAME, "");
        //6.發(fā)布消息
        String msg = "hello,pub/sub";
        channel.basicPublish(PUB_EXCHANGE_NAME, "", null, msg.getBytes());
    }
    @Test
    public void consume_pub_sub1() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(PUB1_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("pub_sub1 Received '" + message + "'");
        };
        channel.basicConsume(PUB1_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }
    @Test
    public void consume_pub_sub2() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(PUB2_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("pub_sub2 Received '" + message + "'");
        };
        channel.basicConsume(PUB2_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

4.路由通訊

發(fā)布/訂閱模式是交換機(jī)將一條消息同時(shí)路由給多個(gè)隊(duì)列,“路由”模式可以將消息通過(guò)交換機(jī)指定到某個(gè)隊(duì)列中從而被消費(fèi)。如下圖,交換機(jī)將所有類型的日志路由到一個(gè)隊(duì)列中,將error類型的日志路由到另一個(gè)隊(duì)列中。

在這里插入圖片描述

    public final static String ROUT_EXCHANGE_NAME = "rout-ex";
    public final static String ROUTALL_QUEUE_NAME = "rout-queall";
    public final static String ROUTONE_QUEUE_NAME = "rout-queone";
    @Test
    public void publish_routing() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建交換機(jī)
        channel.exchangeDeclare(ROUT_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //4.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null);
        //5.綁定隊(duì)列
        channel.queueBind(ROUTALL_QUEUE_NAME, ROUT_EXCHANGE_NAME, "all");
        channel.queueBind(ROUTONE_QUEUE_NAME, ROUT_EXCHANGE_NAME, "one");
        //6.發(fā)布消息
        String msg1 = "hello,1-all";
        String msg2 = "hello,2-all";
        String msg3 = "hello,1-one";
        channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg1.getBytes());
        channel.basicPublish(ROUT_EXCHANGE_NAME, "all", null, msg2.getBytes());
        channel.basicPublish(ROUT_EXCHANGE_NAME, "one", null, msg3.getBytes());
    }
    @Test
    public void consume_routing_all() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(ROUTALL_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_routing_all Received '" + message + "'");
        };
        channel.basicConsume(ROUTALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }
    @Test
    public void consume_routing_one() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(ROUTONE_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_routing_one Received '" + message + "'");
        };
        channel.basicConsume(ROUTONE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

5.主題通訊

“路由”模式仍然有局限性——它不能基于多個(gè)標(biāo)準(zhǔn)進(jìn)行路由。主題可以帶來(lái)很大的靈活性,發(fā)送到主題交換的消息不能有任意的routing_key,它必須是一個(gè)用點(diǎn)分隔的單詞列表,routing_key有兩種重要的特殊情況:

  • *只能代替一個(gè)詞。
  • #可以替換零個(gè)或多個(gè)單詞。

在這里插入圖片描述

    public final static String TOPIC_EXCHANGE_NAME = "topic-ex";
    public final static String TOPICALL_QUEUE_NAME = "topic-queall";
    public final static String TOPICONE_QUEUE_NAME = "topic-queone";
    @Test
    public void publish_topic() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建交換機(jī)
        channel.exchangeDeclare(TOPIC_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //4.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null);
        channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null);
        //5.綁定隊(duì)列
        channel.queueBind(TOPICALL_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "*.all.*");
        channel.queueBind(TOPICONE_QUEUE_NAME, TOPIC_EXCHANGE_NAME, "#.one");
        //6.發(fā)布消息
        String msg1 = "hello.all.world";
        String msg2 = "hello.world.one";
        channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.all.world", null, msg1.getBytes());
        channel.basicPublish(TOPIC_EXCHANGE_NAME, "hello.world.one", null, msg2.getBytes());
    }
        @Test
    public void consume_topic_all() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(TOPICALL_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_topic_all Received '" + message + "'");
        };
        channel.basicConsume(TOPICALL_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }
    @Test
    public void consume_topic_one() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(TOPICONE_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_topic_one Received '" + message + "'");
        };
        channel.basicConsume(TOPICONE_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

6.RPC通訊

RabbitMQ作為消息中間件可以達(dá)到應(yīng)用解耦效果,如果想達(dá)到RPC遠(yuǎn)程調(diào)用同步返回結(jié)果,RabbitMQ同樣支持,其原理如下:

  • 發(fā)布者發(fā)送消息時(shí)指定一個(gè)回調(diào)隊(duì)列和唯一id
  • 消費(fèi)者處理完成后將結(jié)果發(fā)送到回調(diào)隊(duì)列中
  • 發(fā)布者按照唯一id接收消息并處理

如下圖

在這里插入圖片描述

    public final static String RPC_QUEUE_NAME = "rpc-que";
    public final static String RPCCALLBACK_QUEUE_NAME = "rpc-callback-que";
    @Test
    public void publish_rpc() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        // replyTo回調(diào)隊(duì)列
        channel.queueDeclare(RPCCALLBACK_QUEUE_NAME, false, false, false, null);
        //4.發(fā)布消息
        String msg = "hello rpc";
        String correlationId = UUID.randomUUID().toString();
        /*AMQP 協(xié)議預(yù)先定義了一組與消息一起使用的14個(gè)屬性。除了以下屬性外,大多數(shù)屬性很少使用:
        deliveryMode:將消息標(biāo)記為持久(值為2)或瞬時(shí)(任何其他值)。
        contentType:用于描述編碼的mime類型。例如,對(duì)于常用的JSON編碼,最好將此屬性設(shè)置為:application/JSON。
        replyTo:通常用于命名回調(diào)隊(duì)列。
        correlationId:用于將RPC響應(yīng)與請(qǐng)求關(guān)聯(lián)。*/
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().replyTo(RPCCALLBACK_QUEUE_NAME).correlationId(correlationId).build();
        //5.回調(diào)響應(yīng)結(jié)果
        channel.basicPublish("", RPC_QUEUE_NAME, basicProperties, msg.getBytes());
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String recid = delivery.getProperties().getCorrelationId();
            if (correlationId.equalsIgnoreCase(recid)) System.out.println("rpc-callback-que   '" + message + "'");
        };
        channel.basicConsume(RPCCALLBACK_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }
        @Test
    public void consume_rpc() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_rpc Received '" + message + "'");
            String correlationId = delivery.getProperties().getCorrelationId();
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
            String replyTo = delivery.getProperties().getReplyTo();
            String callbackmsg = "rpc callback";
            channel.basicPublish("", replyTo, basicProperties, callbackmsg.getBytes());
        };
        channel.basicConsume(RPC_QUEUE_NAME, true, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

7.Publisher確認(rèn)通訊

Publisher確認(rèn)是RabbitMQ擴(kuò)展以實(shí)現(xiàn)可靠發(fā)布。當(dāng)在通道上啟用發(fā)布者確認(rèn)時(shí),代理將異步確認(rèn)客戶端發(fā)布的消息,這意味著它們已在服務(wù)器端得到處理。

    public final static String CONFIRM_EXCHANGE_NAME = "confirm-ex";
    public final static String CONFIRM_QUEUE_NAME = "confirm-que";
    @Test
    public void publish_confirm() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.開啟確認(rèn)選項(xiàng)
        channel.confirmSelect();
        //4.構(gòu)建交換機(jī)
        channel.exchangeDeclare(CONFIRM_EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //5.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null);
        //6.綁定隊(duì)列
        String right_routing_key = "confirm";
        String error_routing_key = "confirm_err";
        channel.queueBind(CONFIRM_QUEUE_NAME, CONFIRM_EXCHANGE_NAME, right_routing_key);
        //7.消息到達(dá)交換機(jī)確認(rèn)監(jiān)聽
        channel.addConfirmListener((sequenceNumber, multiple) -> {
            System.out.println("消息成功發(fā)送到交換機(jī)");
        }, (sequenceNumber, multiple) -> {
            System.err.println("消息未發(fā)送到交換機(jī),補(bǔ)償操作。");
        });
        //8.消息到達(dá)隊(duì)列確認(rèn)監(jiān)聽
        channel.addReturnListener((replyCode, replyText, exchange, routingKey, basicProperties, body) -> {
            System.err.format("消息 %s 未路由到指定隊(duì)列: %s, replyText: %s,replyCode: %d%n", body, routingKey, replyText, replyCode);
        });
        //設(shè)置消息持久化
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
        //7.發(fā)布消息
        String msg = "hello confirm";
        channel.basicPublish(CONFIRM_EXCHANGE_NAME, error_routing_key,true, basicProperties, msg.getBytes());
        System.in.read();
    }
	@Test
    public void consume_ack() throws IOException, TimeoutException {
        //1.獲取連接對(duì)象
        Connection connection = MQConnections.getConnection();
        //2.構(gòu)建Channl
        Channel channel = connection.createChannel();
        //3.構(gòu)建隊(duì)列,queueDeclare("隊(duì)列名稱","是否持久化隊(duì)列","是否只允許一個(gè)隊(duì)列消費(fèi)","長(zhǎng)時(shí)間未使用是否刪除","其他參數(shù)")
        channel.queueDeclare(CONFIRM_QUEUE_NAME, true, false, false, null);
        //4.監(jiān)聽消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("consume_routing_one Received '" + message + "'");
            //消息處理后手動(dòng)ACK
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        // ack為false
        channel.basicConsume(CONFIRM_QUEUE_NAME, false, deliverCallback, consumerTag -> {
        });
        System.in.read();
    }

代碼倉(cāng)庫(kù)

https://gitee.com/codeWBG/learn_rabbitmq

到此這篇關(guān)于關(guān)于Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)的7種通訊方式的文章就介紹到這了,更多相關(guān)Java整合RabbitMQ實(shí)現(xiàn)生產(chǎn)消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 詳解java線程的開始、暫停、繼續(xù)

    詳解java線程的開始、暫停、繼續(xù)

    本文將介紹通過(guò)線程讀取文件內(nèi)容,并且可以控制線程的開始、暫停、繼續(xù),來(lái)控制讀文件。具有一定的參考作用,下面跟著小編一起來(lái)看下吧
    2017-01-01
  • springboot使用事物注解方式代碼實(shí)例

    springboot使用事物注解方式代碼實(shí)例

    這篇文章主要介紹了springboot使用事物注解方式代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-11-11
  • Java常見內(nèi)存溢出異常分析與解決

    Java常見內(nèi)存溢出異常分析與解決

    本篇文章主要分析了JAVA程序內(nèi)存溢出問(wèn)題原因,較為詳細(xì)的說(shuō)明了java導(dǎo)致程序內(nèi)存溢出的原因與解決方法,感興趣的小伙伴們可以參考一下。
    2016-10-10
  • mybatis中association標(biāo)簽的使用解讀

    mybatis中association標(biāo)簽的使用解讀

    這篇文章主要介紹了mybatis中association標(biāo)簽的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • java實(shí)現(xiàn)批量生成二維碼

    java實(shí)現(xiàn)批量生成二維碼

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)批量生成二維碼的相關(guān)代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-05-05
  • 詳解SpringBoot定時(shí)任務(wù)功能

    詳解SpringBoot定時(shí)任務(wù)功能

    這篇文章主要介紹了SpringBoot定時(shí)任務(wù)功能詳細(xì)解析,這次的功能開發(fā)過(guò)程中也算是對(duì)其內(nèi)涵的進(jìn)一步了解,以后遇到定時(shí)任務(wù)的處理也更清晰,更有效率了,對(duì)SpringBoot定時(shí)任務(wù)相關(guān)知識(shí)感興趣的朋友一起看看吧
    2022-05-05
  • 詳解Spring中的@Scope注解

    詳解Spring中的@Scope注解

    這篇文章主要介紹了詳解Spring中的@Scope注解,@Scope注解是Spring IOC容器中的一個(gè)作用域,在Spring IOC容器中,他用來(lái)配置Bean實(shí)例的作用域?qū)ο?需要的朋友可以參考下
    2023-07-07
  • Java中的WeakHashMap概念原理以及簡(jiǎn)單案例

    Java中的WeakHashMap概念原理以及簡(jiǎn)單案例

    這篇文章主要介紹了Java中的WeakHashMap概念原理以及簡(jiǎn)單案例,WeakHashMap使用了軟引用結(jié)構(gòu),它的對(duì)象在垃圾回收時(shí)會(huì)被刪除,垃圾回收是優(yōu)先級(jí)非常低的線程,不能被顯示調(diào)用,當(dāng)內(nèi)存不足的時(shí)候會(huì)啟用,需要的朋友可以參考下
    2023-09-09
  • Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類

    Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類

    這篇文章主要介紹了Java實(shí)現(xiàn)的properties文件動(dòng)態(tài)修改并自動(dòng)保存工具類,可實(shí)現(xiàn)針對(duì)properties配置文件的相關(guān)修改與保存功能,需要的朋友可以參考下
    2017-11-11
  • spring mvc4的日期/數(shù)字格式化、枚舉轉(zhuǎn)換示例

    spring mvc4的日期/數(shù)字格式化、枚舉轉(zhuǎn)換示例

    本篇文章主要介紹了spring mvc4的日期/數(shù)字格式化、枚舉轉(zhuǎn)換示例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。
    2017-01-01

最新評(píng)論