SpringBoot整合RabbitMQ及生產(chǎn)全場景高級特性實(shí)戰(zhàn)
摘要
整合場景含 topic 工作模式(通過 routingKey 可滿足簡單/工作隊(duì)列/發(fā)布訂閱/路由等四種工作模式)和 confirm(消息確認(rèn))、return(消息返回)、basicAck(消息簽收)、basicNack(拒絕簽收)、DLX(Dead Letter Exchange死信隊(duì)列)實(shí)現(xiàn)延時(shí)/定時(shí)任務(wù)等。
整合
依賴與配置
以下內(nèi)容消費(fèi)者同生產(chǎn)者
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
server.port=8090 spring.rabbitmq.host=192.168.168.10 spring.rabbitmq.port=5672 spring.rabbitmq.username=zheng123 spring.rabbitmq.password=zheng123 spring.rabbitmq.virtual-host=/zheng spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.listener.direct.acknowledge-mode=manual
生產(chǎn)者配置消息隊(duì)列規(guī)則
下邊是兩種配置方式,本次整合示例中使用第一個(gè)配置
@Configuration public class TopicConfig { // 聲明隊(duì)列 @Bean public Queue topicQ1() { return new Queue("topic_sb_mq_q1"); } // 聲明隊(duì)列并綁定該隊(duì)列到死信交換機(jī)(返回值有兩種寫法,任選一種都可以) // 測試死信需要關(guān)閉原隊(duì)列的監(jiān)聽 @Bean public Queue topicQ2() { return QueueBuilder.durable("topic_sb_mq_q2") .withArgument("x-dead-letter-exchange", "topicExchange") .withArgument("x-dead-letter-routing-key", "changsha.f") .withArgument("x-message-ttl", 10000) .build(); Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange","topicExchange"); arguments.put("x-dead-letter-routing-key","changsha.f"); arguments.put("x-message-ttl",10000); return new Queue("topic_sb_mq_q2",true,false,false,arguments); } //聲明exchange @Bean public TopicExchange setTopicExchange() { return new TopicExchange("topicExchange"); } //聲明binding,需要聲明一個(gè)routingKey @Bean public Binding bindTopicHebei1() { return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*"); } @Bean public Binding bindTopicHebei2() { return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing"); } }
@Configuration public class RabbitMqConfig { //定義交換機(jī)的名字 public static final String EXCHANGE_NAME = "boot_topic_exchange"; //定義隊(duì)列的名字 public static final String QUEUE_NAME = "boot_queue"; //1、聲明交換機(jī) @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2、聲明隊(duì)列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //3、隊(duì)列與交換機(jī)進(jìn)行綁定 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ // topic模式兼容廣播模式,路由模式。with("#")則類似廣播模式匹配所有訂閱者;with("boot.1")則類似路由模式匹配指定訂閱者 return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
生產(chǎn)者發(fā)布消息
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value="/topicSend") public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { // 定義 confirm 回調(diào) rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // confirmed } else { // nack-ed } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{ // return message }); if(null == routingKey) { routingKey="changsha.kf"; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : routingKey >"+routingKey+";message > "+message; } }
消費(fèi)者監(jiān)聽消息
@Component public class ConcumerReceiver { //topic 模式 //注意這個(gè)模式會有優(yōu)先匹配原則。例如發(fā)送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不會再去匹配*.ITd @RabbitListener(queues="topic_sb_mq_q1") public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException { // 消息id long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // message.getBody() todosomething // 簽收消息 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 拒絕簽收 // 第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會重新發(fā)送該消息給消費(fèi)端 channel.basicNack(deliveryTag, true, true); } } @RabbitListener(queues="topic_sb_mq_q2") public void topicReceiveq2(String message) { System.out.println("Topic模式 topic_sb_mq_q2 received message : " +message); } }
到此這篇關(guān)于SpringBoot整合RabbitMQ及生產(chǎn)全場景高級特性實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ的5種模式實(shí)戰(zhàn)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)消息確認(rèn)機(jī)制
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- Springboot整合Rabbitmq之Confirm和Return機(jī)制
- Springboot整合RabbitMq測試TTL的方法詳解
- 詳解SpringBoot整合RabbitMQ如何實(shí)現(xiàn)消息確認(rèn)
- SpringBoot整合RabbitMQ實(shí)現(xiàn)交換機(jī)與隊(duì)列的綁定
- SpringBoot整合RabbitMQ實(shí)戰(zhàn)教程附死信交換機(jī)
- springboot整合消息隊(duì)列RabbitMQ
相關(guān)文章
Sentinel的熔斷降級、資源規(guī)則詳解與實(shí)例
這篇文章主要介紹了Sentinel的熔斷降級、資源規(guī)則詳解與實(shí)例,Sentinel是阿里巴巴開源的一款流量控制和熔斷降級的框架,它主要用于保護(hù)分布式系統(tǒng)中的服務(wù)穩(wěn)定性,Sentinel通過對服務(wù)進(jìn)行流量控制和熔斷降級,可以有效地保護(hù)系統(tǒng)的穩(wěn)定性,需要的朋友可以參考下2023-09-09Maven實(shí)現(xiàn)項(xiàng)目構(gòu)建工具
本文主要介紹了Maven實(shí)現(xiàn)項(xiàng)目構(gòu)建工具,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07JDK源碼之線程并發(fā)協(xié)調(diào)神器CountDownLatch和CyclicBarrier詳解
我一直認(rèn)為程序是對于現(xiàn)實(shí)世界的邏輯描述,而在現(xiàn)實(shí)世界中很多事情都需要各方協(xié)調(diào)合作才能完成,就好比完成一個(gè)平臺的交付不可能只靠一個(gè)人,而需要研發(fā)、測試、產(chǎn)品以及項(xiàng)目經(jīng)理等不同角色人員進(jìn)行通力合作才能完成最終的交付2022-02-02

IDEA配置SpringBoot熱啟動(dòng),以及熱啟動(dòng)失效問題

Java編程思想中關(guān)于并發(fā)的總結(jié)