ActiveMQ消息簽收機(jī)制代碼實(shí)例詳解
這篇文章主要介紹了ActiveMQ消息簽收機(jī)制代碼實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
消費(fèi)者客戶端成功接收一條消息的標(biāo)志是:這條消息被簽收。
消費(fèi)者客戶端成功接收一條消息一般包括三個(gè)階段:
1、消費(fèi)者接收消息,也即從MessageConsumer的receive方法返回
2、消費(fèi)者處理消息
3、消息被簽收
其中,第三階段的簽收可以有ActiveMQ發(fā)起,也可以由消費(fèi)者客戶端發(fā)起,取決于Session是否開啟事務(wù)以及簽收模式的設(shè)置。
在帶事務(wù)的Session中,消費(fèi)者客戶端事務(wù)提交之時(shí),消息自動(dòng)完成簽收。
在不帶事務(wù)的Session中,消息何時(shí)以及如何被簽收取決于Session的簽收模式設(shè)置
非事務(wù)Session可以設(shè)置如下幾種簽收模式:
1.Session.AUTO_ACKNOWLEDGE
當(dāng)消息從MessageConsumer的receive方法返回或者從MessageListener接口的onMessage方法返回時(shí),會(huì)話自動(dòng)確認(rèn)消息簽收
2.Session.CLIENT_ACKNOWLEDGE
需要消費(fèi)者客戶端主動(dòng)調(diào)用acknowledge方法簽收消息,這種模式實(shí)在Session層面進(jìn)行簽收的,簽收一個(gè)已經(jīng)消費(fèi)的消息會(huì)自動(dòng)的簽收這個(gè)Session已消費(fèi)的所有消息:
例如一個(gè)消費(fèi)者在一個(gè)Session中消費(fèi)了5條消息,然后確認(rèn)第3條消息,所有這5條消息都會(huì)被簽收
3.Session.DUPS_OK_ACKNOWLEDGE
這種方式允許JMS不必急于確認(rèn)收到的消息,允許在收到多個(gè)消息之后一次完成確認(rèn),與Auto_AcKnowledge相比,這種確認(rèn)方式在某些情況下可能更有效,因?yàn)闆]有確認(rèn),當(dāng)系統(tǒng)崩潰或者網(wǎng)絡(luò)出現(xiàn)故障的時(shí)候,消息可以被重新傳遞.
這種方式會(huì)引起消息的重復(fù),但是降低了Session的開銷,所以只有客戶端能容忍重復(fù)的消息才可使用。(如果ActiveMQ再次傳送同一消息,那么消息頭中的JMSRedelivered將被設(shè)置為true)
帶事務(wù)session的案例
生產(chǎn)者
必須在生產(chǎn)完數(shù)據(jù)之后手動(dòng)提交session
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); //啟動(dòng)連接 connection.start(); // Session: 一個(gè)發(fā)送或接收消息的線程 false:代表不帶事務(wù)的session AUTO_ACKNOWLEDGE:代表自動(dòng)簽收 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰. // 獲取session注意參數(shù)值my-queue是Query的名字 Queue queue = session.createQueue("my-queue"); // MessageProducer:創(chuàng)建消息生產(chǎn)者 MessageProducer producer = session.createProducer(queue); // 設(shè)置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 發(fā)送消息 for (int i = 1; i <= 5; i++) { sendMsg(session, producer, i); } System.out.println("發(fā)送成功!"); session.commit(); session.close(); connection.close(); } /** * 在指定的會(huì)話上,通過指定的消息生產(chǎn)者發(fā)出一條消息 * * @param session * 消息會(huì)話 * @param producer * 消息生產(chǎn)者 */ public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException { // 創(chuàng)建一條文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 通過消息生產(chǎn)者發(fā)出消息 producer.send(message); } }
消費(fèi)者
消費(fèi)完數(shù)據(jù)之后必須手動(dòng)提交session
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個(gè)發(fā)送或接收消息的線程 true:表單開啟事務(wù) AUTO_ACKNOWLEDGE:代表自動(dòng)簽收 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰. // 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置 Queue queue = session.createQueue("my-queue"); // 消費(fèi)者,消息接收者 MessageConsumer consumer = session.createConsumer(queue); while (true) { //receive():獲取消息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到消息:" + message.getText()); session.commit(); } else { break; } } //回收資源 session.close(); connection.close(); } }
不帶事務(wù)session的案例
1.自動(dòng)簽收
2.手動(dòng)簽收
生產(chǎn)者
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Producter { public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); //啟動(dòng)連接 connection.start(); // Session: 一個(gè)發(fā)送或接收消息的線程 false:代表不帶事務(wù)的session AUTO_ACKNOWLEDGE:代表自動(dòng)簽收 /* Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/ Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰. // 獲取session注意參數(shù)值my-queue是Query的名字 Queue queue = session.createQueue("my-queue"); // MessageProducer:創(chuàng)建消息生產(chǎn)者 MessageProducer producer = session.createProducer(queue); // 設(shè)置不持久化 PERSISTENT:代表持久化 NON_PERSISTENT:代表不持久化 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 發(fā)送消息 for (int i = 1; i <= 5; i++) { sendMsg(session, producer, i); } System.out.println("發(fā)送成功!"); session.close(); connection.close(); } /** * 在指定的會(huì)話上,通過指定的消息生產(chǎn)者發(fā)出一條消息 * * @param session * 消息會(huì)話 * @param producer * 消息生產(chǎn)者 */ public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException { // 創(chuàng)建一條文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i); // 通過消息生產(chǎn)者發(fā)出消息 producer.send(message); message.acknowledge(); //手動(dòng)提交 } }
消費(fèi)者
package com.wn.ddd; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import sun.plugin2.os.windows.SECURITY_ATTRIBUTES; import javax.jms.*; public class JmsReceiver { public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616"); // JMS 客戶端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個(gè)發(fā)送或接收消息的線程 true:表單開啟事務(wù) AUTO_ACKNOWLEDGE:代表自動(dòng)簽收 /*Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);*/ Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰. // 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置 Queue queue = session.createQueue("my-queue"); // 消費(fèi)者,消息接收者 MessageConsumer consumer = session.createConsumer(queue); while (true) { //receive():獲取消息 TextMessage message = (TextMessage) consumer.receive(); if (null != message) { System.out.println("收到消息:" + message.getText()); message.acknowledge(); //手動(dòng)提交 } else { break; } } //回收資源 session.close(); connection.close(); } }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- ActiveMQ消息隊(duì)列技術(shù)融合Spring過程解析
- Java中間消息件ActiveMQ使用實(shí)例
- Spring Boot教程之利用ActiveMQ實(shí)現(xiàn)延遲消息
- PHP使用ActiveMQ實(shí)現(xiàn)消息隊(duì)列的方法詳解
- 淺談Java消息隊(duì)列總結(jié)篇(ActiveMQ、RabbitMQ、ZeroMQ、Kafka)
- python 發(fā)送和接收ActiveMQ消息的實(shí)例
- Docker學(xué)習(xí)之搭建ActiveMQ消息服務(wù)的方法步驟
- spring整合JMS實(shí)現(xiàn)同步收發(fā)消息(基于ActiveMQ的實(shí)現(xiàn))
- 詳解Java消息隊(duì)列-Spring整合ActiveMq
相關(guān)文章
scala+redis實(shí)現(xiàn)分布式鎖的示例代碼
這篇文章主要介紹了scala+redis實(shí)現(xiàn)分布式鎖的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06java 遍歷request中的所有表單數(shù)據(jù)的實(shí)例代碼
下面小編就為大家?guī)硪黄猨ava 遍歷request中的所有表單數(shù)據(jù)的實(shí)例代碼。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2016-09-09Java Thread中start()和run()的區(qū)別_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
start() : 它的作用是啟動(dòng)一個(gè)新線程,新線程會(huì)執(zhí)行相應(yīng)的run()方法。start()不能被重復(fù)調(diào)用。而run() : run()就和普通的成員方法一樣,可以被重復(fù)調(diào)用。下面通過示例代碼給大家介紹了Java Thread中start()和run()的區(qū)別,感興趣的朋友一起看看吧2017-05-05解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法
這篇文章主要介紹了解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11基于Java實(shí)現(xiàn)一個(gè)復(fù)雜關(guān)系表達(dá)式過濾器
這篇文章主要為大家詳細(xì)介紹了如何基于Java實(shí)現(xiàn)一個(gè)復(fù)雜關(guān)系表達(dá)式過濾器。文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2022-07-07