springboot中使用rabbitt的詳細方法
RabbitMQ的示例,涉及到Direct、Fanout、Topic和Headers交換機以及普通隊列、延遲隊列和死信隊列
在pom.xml文件中添加以下依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置RabbitMQ連接信息,在application.properties文件中添加以下配置:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
創(chuàng)建消息隊列
創(chuàng)建一個普通的Direct交換機和隊列:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue() {
return new Queue("directQueue", false);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public Binding binding(Queue queue, DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with("directRoutingKey");
}
}創(chuàng)建一個Fanout交換機和隊列:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queueA() {
return new Queue("fanoutQueueA", false);
}
@Bean
public Queue queueB() {
return new Queue("fanoutQueueB", false);
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding bindingA(Queue queueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
@Bean
public Binding bindingB(Queue queueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}創(chuàng)建一個Topic交換機和隊列:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queueA() {
return new Queue("topicQueueA", false);
}
@Bean
public Queue queueB() {
return new Queue("topicQueueB", false);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding bindingA(Queue queueA, TopicExchange topicExchange) {
return BindingBuilder.bind(queueA).to(topicExchange).with("topic.key.*");
}
@Bean
public Binding bindingB(Queue queueB, TopicExchange topicExchange) {
return BindingBuilder.bind(queueB).to(topicExchange).with("topic.#");
}
}創(chuàng)建一個Headers交換機和隊列:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue queue() {
return new Queue("headerQueue", false);
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headerExchange");
}
@Bean
public Binding binding(Queue queue, HeadersExchange headersExchange) {
Map<String, Object> headers = new HashMap<>();
headers.put("header1", "value1");
headers.put("header2", "value2");
return BindingBuilder.bind(queue).to(headersExchange).whereAll(headers).match();
}
}創(chuàng)建一個延遲隊列和死信隊列:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 10000); // 消息過期時間為10秒
args.put("x-dead-letter-exchange", "deadLetterExchange");
args.put("x-dead-letter-routing-key", "deadLetterRoutingKey");
return new Queue("delayQueue", false, false, false, args);
}
@Bean
public Queue deadLetterQueue() {
return new Queue("deadLetterQueue", false);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("deadLetterExchange");
}
@Bean
public Binding binding(Queue delayQueue, DirectExchange directExchange) {
return BindingBuilder.bind(delayQueue).to(directExchange).with("delayRoutingKey");
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("deadLetterRoutingKey");
}
}發(fā)送和接收消息
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("directExchange", "directRoutingKey", message);
}
@RabbitListener(queues = "directQueue")
public void receiveDirectMessage(String message) {
System.out.println("Received message from directQueue: " + message);
}
public void sendFanoutMessage(String message) {
rabbitTemplate.convertAndSend("fanoutExchange", "", message);
}
@RabbitListener(queues = "fanoutQueueA")
public void receiveFanoutMessageA(String message) {
System.out.println("Received message from fanoutQueueA: " + message);
}
@RabbitListener(queues = "fanoutQueueB")
public void receiveFanoutMessageB(String message) {
System.out.println("Received message from fanoutQueueB: " + message);
}
public void sendTopicMessage(String message) {
rabbitTemplate.convertAndSend("topicExchange", "topic.key.message", message);
}
@RabbitListener(queues = "topicQueueA")
public void receiveTopicMessageA(String message) {
System.out.println("Received message from topicQueueA: " + message);
}
@RabbitListener(queues = "topicQueueB")
public void receiveTopicMessageB(String message) {
System.out.println("Received message from topicQueueB: " + message);
}
public void sendHeaderMessage(String message) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("header1", "value1");
messageProperties.setHeader("header2", "value2");
Message msg = MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build();
rabbitTemplate.send("headerExchange", "", msg);
}
@RabbitListener(queues = "headerQueue")
public void receiveHeaderMessage(Message message) {
System.out.println("Received message from headerQueue: " + new String(message.getBody()));
}
public void sendDelayMessage(String message) {
rabbitTemplate.convertAndSend("directExchange", "delayRoutingKey", message);
}
@RabbitListener(queues = "deadLetterQueue")
public void receiveDeadLetterMessage(String message) {
System.out.println("Received message from deadLetterQueue: " + message);
}
}消息確認模式
在RabbitMQ中,消息確認模式有兩種:自動確認和手動確認。自動確認是指當(dāng)消息被成功接收時,RabbitMQ會自動確認消息。手動確認是指當(dāng)消費者成功處理消息后,顯式地向RabbitMQ發(fā)送確認消息。 1 使用手動確認模式:
@Configuration
public class RabbitMQConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}@Service
public class RabbitMQService {
@RabbitListener(queues = "directQueue")
public void receiveDirectMessage(Message message, Channel channel) throws IOException {
try {
System.out.println("Received message from directQueue: " + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}在上述代碼中,我們使用了MANUAL(手動確認)模式,并在消息處理成功后使用channel.basicAck()方法顯式地確認消息。如果發(fā)生任何異常,我們使用channel.basicNack()方法拒絕消息并重新加入隊列。
希望這個示例能夠幫助您了解如何在Spring Boot中使用RabbitMQ,并使用不同的交換機和隊列類型以及消息確認模式。
到此這篇關(guān)于springboot中使用rabbitt的文章就介紹到這了,更多相關(guān)springboot使用rabbitt內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Spring Controller 獲取請求參數(shù)的幾種方法詳解
這篇文章主要介紹了Java Spring Controller 獲取請求參數(shù)的幾種方法詳解的相關(guān)資料,這里提供了6種方法,需要的朋友可以參考下2016-12-12
利用Java實現(xiàn)解析網(wǎng)頁中的內(nèi)容
這篇文章主要為大家詳細介紹了如何利用Java語言做一個解析指定網(wǎng)址的網(wǎng)頁內(nèi)容小應(yīng)用,文中的實現(xiàn)步驟講解詳細,感興趣的可以嘗試下2022-10-10
shiro實現(xiàn)單點登錄(一個用戶同一時刻只能在一個地方登錄)
這篇文章主要介紹了shiro實現(xiàn)單點登錄(一個用戶同一時刻只能在一個地方登錄)的相關(guān)資料,非常不錯,具有參考借鑒價值,感興趣的朋友一起學(xué)習(xí)吧2016-08-08
Springboot 整合 Java DL4J 實現(xiàn)文物保護系統(tǒng)的詳細過程
在數(shù)字化時代,文物保護尤為關(guān)鍵,本文介紹如何利用SpringBoot和Deeplearning4j構(gòu)建一個圖像識別的文物保護系統(tǒng),系統(tǒng)采用卷積神經(jīng)網(wǎng)絡(luò)(CNN),能夠識別文物的損壞情況,本文介紹Springboot 整合 Java DL4J 實現(xiàn)文物保護系統(tǒng),感興趣的朋友一起看看吧2024-10-10
關(guān)于Sentinel中冷啟動限流原理WarmUpController
這篇文章主要介紹了關(guān)于Sentinel中冷啟動限流原理WarmUpController,具有很好的參考價值,希望對大家有所幫助。2023-04-04
Springboot整合Swagger2后訪問swagger-ui.html 404報錯問題解決方案
這篇文章主要介紹了Springboot整合Swagger2后訪問swagger-ui.html 404報錯,本文給大家分享兩種解決方案,結(jié)合實例代碼給大家介紹的非常詳細,需要的朋友可以參考下2023-06-06

