springboot中使用rabbitt的詳細(xì)方法
RabbitMQ的示例,涉及到Direct、Fanout、Topic和Headers交換機(jī)以及普通隊(duì)列、延遲隊(duì)列和死信隊(duì)列
在pom.xml文件中添加以下依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置RabbitMQ連接信息,在application.properties文件中添加以下配置:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
創(chuàng)建消息隊(duì)列
創(chuàng)建一個(gè)普通的Direct交換機(jī)和隊(duì)列:
@Configuration public class RabbitMQConfig { @Bean public Queue queue() { return new Queue("directQueue", false); } @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public Binding binding(Queue queue, DirectExchange directExchange) { return BindingBuilder.bind(queue).to(directExchange).with("directRoutingKey"); } }
創(chuàng)建一個(gè)Fanout交換機(jī)和隊(duì)列:
@Configuration public class RabbitMQConfig { @Bean public Queue queueA() { return new Queue("fanoutQueueA", false); } @Bean public Queue queueB() { return new Queue("fanoutQueueB", false); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean public Binding bindingA(Queue queueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean public Binding bindingB(Queue queueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueB).to(fanoutExchange); } }
創(chuàng)建一個(gè)Topic交換機(jī)和隊(duì)列:
@Configuration public class RabbitMQConfig { @Bean public Queue queueA() { return new Queue("topicQueueA", false); } @Bean public Queue queueB() { return new Queue("topicQueueB", false); } @Bean public TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean public Binding bindingA(Queue queueA, TopicExchange topicExchange) { return BindingBuilder.bind(queueA).to(topicExchange).with("topic.key.*"); } @Bean public Binding bindingB(Queue queueB, TopicExchange topicExchange) { return BindingBuilder.bind(queueB).to(topicExchange).with("topic.#"); } }
創(chuàng)建一個(gè)Headers交換機(jī)和隊(duì)列:
@Configuration public class RabbitMQConfig { @Bean public Queue queue() { return new Queue("headerQueue", false); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headerExchange"); } @Bean public Binding binding(Queue queue, HeadersExchange headersExchange) { Map<String, Object> headers = new HashMap<>(); headers.put("header1", "value1"); headers.put("header2", "value2"); return BindingBuilder.bind(queue).to(headersExchange).whereAll(headers).match(); } }
創(chuàng)建一個(gè)延遲隊(duì)列和死信隊(duì)列:
@Configuration public class RabbitMQConfig { @Bean public Queue delayQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 10000); // 消息過期時(shí)間為10秒 args.put("x-dead-letter-exchange", "deadLetterExchange"); args.put("x-dead-letter-routing-key", "deadLetterRoutingKey"); return new Queue("delayQueue", false, false, false, args); } @Bean public Queue deadLetterQueue() { return new Queue("deadLetterQueue", false); } @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("deadLetterExchange"); } @Bean public Binding binding(Queue delayQueue, DirectExchange directExchange) { return BindingBuilder.bind(delayQueue).to(directExchange).with("delayRoutingKey"); } @Bean public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("deadLetterRoutingKey"); } }
發(fā)送和接收消息
@Service public class RabbitMQService { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(String message) { rabbitTemplate.convertAndSend("directExchange", "directRoutingKey", message); } @RabbitListener(queues = "directQueue") public void receiveDirectMessage(String message) { System.out.println("Received message from directQueue: " + message); } public void sendFanoutMessage(String message) { rabbitTemplate.convertAndSend("fanoutExchange", "", message); } @RabbitListener(queues = "fanoutQueueA") public void receiveFanoutMessageA(String message) { System.out.println("Received message from fanoutQueueA: " + message); } @RabbitListener(queues = "fanoutQueueB") public void receiveFanoutMessageB(String message) { System.out.println("Received message from fanoutQueueB: " + message); } public void sendTopicMessage(String message) { rabbitTemplate.convertAndSend("topicExchange", "topic.key.message", message); } @RabbitListener(queues = "topicQueueA") public void receiveTopicMessageA(String message) { System.out.println("Received message from topicQueueA: " + message); } @RabbitListener(queues = "topicQueueB") public void receiveTopicMessageB(String message) { System.out.println("Received message from topicQueueB: " + message); } public void sendHeaderMessage(String message) { MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("header1", "value1"); messageProperties.setHeader("header2", "value2"); Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build(); rabbitTemplate.send("headerExchange", "", msg); } @RabbitListener(queues = "headerQueue") public void receiveHeaderMessage(Message message) { System.out.println("Received message from headerQueue: " + new String(message.getBody())); } public void sendDelayMessage(String message) { rabbitTemplate.convertAndSend("directExchange", "delayRoutingKey", message); } @RabbitListener(queues = "deadLetterQueue") public void receiveDeadLetterMessage(String message) { System.out.println("Received message from deadLetterQueue: " + message); } }
消息確認(rèn)模式
在RabbitMQ中,消息確認(rèn)模式有兩種:自動(dòng)確認(rèn)和手動(dòng)確認(rèn)。自動(dòng)確認(rèn)是指當(dāng)消息被成功接收時(shí),RabbitMQ會自動(dòng)確認(rèn)消息。手動(dòng)確認(rèn)是指當(dāng)消費(fèi)者成功處理消息后,顯式地向RabbitMQ發(fā)送確認(rèn)消息。 1 使用手動(dòng)確認(rèn)模式:
@Configuration public class RabbitMQConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
@Service public class RabbitMQService { @RabbitListener(queues = "directQueue") public void receiveDirectMessage(Message message, Channel channel) throws IOException { try { System.out.println("Received message from directQueue: " + new String(message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }
在上述代碼中,我們使用了MANUAL(手動(dòng)確認(rèn))模式,并在消息處理成功后使用channel.basicAck()方法顯式地確認(rèn)消息。如果發(fā)生任何異常,我們使用channel.basicNack()方法拒絕消息并重新加入隊(duì)列。
希望這個(gè)示例能夠幫助您了解如何在Spring Boot中使用RabbitMQ,并使用不同的交換機(jī)和隊(duì)列類型以及消息確認(rèn)模式。
到此這篇關(guān)于springboot中使用rabbitt的文章就介紹到這了,更多相關(guān)springboot使用rabbitt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Spring Controller 獲取請求參數(shù)的幾種方法詳解
這篇文章主要介紹了Java Spring Controller 獲取請求參數(shù)的幾種方法詳解的相關(guān)資料,這里提供了6種方法,需要的朋友可以參考下2016-12-12利用Java實(shí)現(xiàn)解析網(wǎng)頁中的內(nèi)容
這篇文章主要為大家詳細(xì)介紹了如何利用Java語言做一個(gè)解析指定網(wǎng)址的網(wǎng)頁內(nèi)容小應(yīng)用,文中的實(shí)現(xiàn)步驟講解詳細(xì),感興趣的可以嘗試下2022-10-10Java數(shù)組動(dòng)態(tài)增加容量過程解析
這篇文章主要介紹了Java數(shù)組動(dòng)態(tài)增加容量過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09shiro實(shí)現(xiàn)單點(diǎn)登錄(一個(gè)用戶同一時(shí)刻只能在一個(gè)地方登錄)
這篇文章主要介紹了shiro實(shí)現(xiàn)單點(diǎn)登錄(一個(gè)用戶同一時(shí)刻只能在一個(gè)地方登錄)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,感興趣的朋友一起學(xué)習(xí)吧2016-08-08Springboot 整合 Java DL4J 實(shí)現(xiàn)文物保護(hù)系統(tǒng)的詳細(xì)過程
在數(shù)字化時(shí)代,文物保護(hù)尤為關(guān)鍵,本文介紹如何利用SpringBoot和Deeplearning4j構(gòu)建一個(gè)圖像識別的文物保護(hù)系統(tǒng),系統(tǒng)采用卷積神經(jīng)網(wǎng)絡(luò)(CNN),能夠識別文物的損壞情況,本文介紹Springboot 整合 Java DL4J 實(shí)現(xiàn)文物保護(hù)系統(tǒng),感興趣的朋友一起看看吧2024-10-10關(guān)于Sentinel中冷啟動(dòng)限流原理WarmUpController
這篇文章主要介紹了關(guān)于Sentinel中冷啟動(dòng)限流原理WarmUpController,具有很好的參考價(jià)值,希望對大家有所幫助。2023-04-04Springboot整合Swagger2后訪問swagger-ui.html 404報(bào)錯(cuò)問題解決方案
這篇文章主要介紹了Springboot整合Swagger2后訪問swagger-ui.html 404報(bào)錯(cuò),本文給大家分享兩種解決方案,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06