SpringBoot整合RabbitMQ及生產(chǎn)全場景高級特性實戰(zhàn)
摘要
整合場景含 topic 工作模式(通過 routingKey 可滿足簡單/工作隊列/發(fā)布訂閱/路由等四種工作模式)和 confirm(消息確認)、return(消息返回)、basicAck(消息簽收)、basicNack(拒絕簽收)、DLX(Dead Letter Exchange死信隊列)實現(xiàn)延時/定時任務(wù)等。
整合
依賴與配置
以下內(nèi)容消費者同生產(chǎn)者
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
server.port=8090 spring.rabbitmq.host=192.168.168.10 spring.rabbitmq.port=5672 spring.rabbitmq.username=zheng123 spring.rabbitmq.password=zheng123 spring.rabbitmq.virtual-host=/zheng spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.listener.direct.acknowledge-mode=manual
生產(chǎn)者配置消息隊列規(guī)則
下邊是兩種配置方式,本次整合示例中使用第一個配置
@Configuration public class TopicConfig { // 聲明隊列 @Bean public Queue topicQ1() { return new Queue("topic_sb_mq_q1"); } // 聲明隊列并綁定該隊列到死信交換機(返回值有兩種寫法,任選一種都可以) // 測試死信需要關(guān)閉原隊列的監(jiān)聽 @Bean public Queue topicQ2() { return QueueBuilder.durable("topic_sb_mq_q2") .withArgument("x-dead-letter-exchange", "topicExchange") .withArgument("x-dead-letter-routing-key", "changsha.f") .withArgument("x-message-ttl", 10000) .build(); Map<String,Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange","topicExchange"); arguments.put("x-dead-letter-routing-key","changsha.f"); arguments.put("x-message-ttl",10000); return new Queue("topic_sb_mq_q2",true,false,false,arguments); } //聲明exchange @Bean public TopicExchange setTopicExchange() { return new TopicExchange("topicExchange"); } //聲明binding,需要聲明一個routingKey @Bean public Binding bindTopicHebei1() { return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("changsha.*"); } @Bean public Binding bindTopicHebei2() { return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.beijing"); } }
@Configuration public class RabbitMqConfig { //定義交換機的名字 public static final String EXCHANGE_NAME = "boot_topic_exchange"; //定義隊列的名字 public static final String QUEUE_NAME = "boot_queue"; //1、聲明交換機 @Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //2、聲明隊列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //3、隊列與交換機進行綁定 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ // topic模式兼容廣播模式,路由模式。with("#")則類似廣播模式匹配所有訂閱者;with("boot.1")則類似路由模式匹配指定訂閱者 return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } }
生產(chǎn)者發(fā)布消息
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value="/topicSend") public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { // 定義 confirm 回調(diào) rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // confirmed } else { // nack-ed } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{ // return message }); if(null == routingKey) { routingKey="changsha.kf"; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties)); return "message sended : routingKey >"+routingKey+";message > "+message; } }
消費者監(jiān)聽消息
@Component public class ConcumerReceiver { //topic 模式 //注意這個模式會有優(yōu)先匹配原則。例如發(fā)送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不會再去匹配*.ITd @RabbitListener(queues="topic_sb_mq_q1") public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException { // 消息id long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // message.getBody() todosomething // 簽收消息 channel.basicAck(deliveryTag, true); } catch (Exception e) { // 拒絕簽收 // 第三個參數(shù):requeue:重回隊列。如果設(shè)置為true,則消息重新回到queue,broker會重新發(fā)送該消息給消費端 channel.basicNack(deliveryTag, true, true); } } @RabbitListener(queues="topic_sb_mq_q2") public void topicReceiveq2(String message) { System.out.println("Topic模式 topic_sb_mq_q2 received message : " +message); } }
到此這篇關(guān)于SpringBoot整合RabbitMQ及生產(chǎn)全場景高級特性實戰(zhàn)的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot整合RabbitMQ的5種模式實戰(zhàn)
- SpringBoot整合RabbitMQ實現(xiàn)消息確認機制
- RabbitMQ 3.9.7 鏡像模式集群與Springboot 2.5.5 整合
- Springboot整合Rabbitmq之Confirm和Return機制
- Springboot整合RabbitMq測試TTL的方法詳解
- 詳解SpringBoot整合RabbitMQ如何實現(xiàn)消息確認
- SpringBoot整合RabbitMQ實現(xiàn)交換機與隊列的綁定
- SpringBoot整合RabbitMQ實戰(zhàn)教程附死信交換機
- springboot整合消息隊列RabbitMQ
相關(guān)文章
JDK源碼之線程并發(fā)協(xié)調(diào)神器CountDownLatch和CyclicBarrier詳解
我一直認為程序是對于現(xiàn)實世界的邏輯描述,而在現(xiàn)實世界中很多事情都需要各方協(xié)調(diào)合作才能完成,就好比完成一個平臺的交付不可能只靠一個人,而需要研發(fā)、測試、產(chǎn)品以及項目經(jīng)理等不同角色人員進行通力合作才能完成最終的交付2022-02-02

Java編程思想中關(guān)于并發(fā)的總結(jié)