ActiveMQ中consumer的消息確認(rèn)機(jī)制詳解
1. Consumer消息確認(rèn)機(jī)制
簡(jiǎn)單講就是消息被Consumer接收后,Consumer將在何時(shí)確認(rèn)消息。
對(duì)于broker而言,只有接收到確認(rèn)指令,才會(huì)認(rèn)為消息被正確的接收或者處理成功了。InforSuiteMQ提供以下幾種Consumer與Broker之間的消息確認(rèn)方式。
(1)AUTO_ACKNOWLEDGE = 1 自動(dòng)確認(rèn)
(2)CLIENT_ACKNOWLEDGE = 2 客戶端手動(dòng)確認(rèn)
(3)DUPS_OK_ACKNOWLEDGE = 3 自動(dòng)批量確認(rèn)
(4)SESSION_TRANSACTED = 0 事務(wù)提交并確認(rèn)
(5)INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認(rèn)
前四種是JMS API中提供的客戶端ACK_MODE。第五種是InforSuiteMQ自定義補(bǔ)充的一種ACK_MODE。
Consumer有兩種消息消費(fèi)方式:同步消費(fèi)consumer.receive()和異步消費(fèi)MessageListener,這兩種方式下,消息確認(rèn)機(jī)制也是不同的。同一Consumer中,不可同時(shí)使用這兩種消費(fèi)方式。
同步調(diào)用時(shí),在消息從receive方法返回之前,就已經(jīng)調(diào)用了ACK;因此如果Client端沒(méi)有處理成功,此消息將丟失(可能重發(fā),與ACK_MODE有關(guān))。
異步調(diào)用時(shí),消息的確認(rèn)是在onMessage方法返回之后,如果onMessage方法異常,會(huì)導(dǎo)致消息重發(fā)。
2.消息確認(rèn)方式詳解
2.1自動(dòng)確認(rèn)
AUTO_ACKNOWLEDGE : 自動(dòng)確認(rèn),這就意味著消息的確認(rèn)時(shí)機(jī)將有consumer擇機(jī)確認(rèn)。
使用開(kāi)發(fā)者必須明確知道"擇機(jī)確認(rèn)"的具體時(shí)機(jī),否則將有可能導(dǎo)致消息的丟失,或者消息的重復(fù)接收。
2.2客戶端確認(rèn)
CLIENT_ACKNOWLEDGE : 客戶端手動(dòng)確認(rèn),開(kāi)發(fā)者需要自己擇機(jī)確認(rèn)??蛻舳耸謩?dòng)確認(rèn)時(shí)機(jī)有以下三種:
(1) message.acknowledge():確認(rèn)當(dāng)前session中所有consumer中尚未ACK的消息;
(2) InforSuiteMQSession.acknowledge():確認(rèn)當(dāng)前session中所有consumer中尚未ACK的消息;
(3) InforSuiteMQMessageConsumer.acknowledege():確認(rèn)當(dāng)前consumer中那些尚未確認(rèn)的消息。
2.3自動(dòng)批量確認(rèn)
DUPS_OK_ACKNOWLEDGE : 自動(dòng)批量確認(rèn),也是一種自動(dòng)確實(shí)方式,使用方法與AUTO_ACKNOWLEDGE相同,具有“批量”和“延遲”的確認(rèn)特點(diǎn)。
該模式下,當(dāng)Consumer故障重啟后,那些尚未被ACK確認(rèn)的消息會(huì)重新發(fā)送過(guò)來(lái),這就意味著消息可能重復(fù)。
2.4事務(wù)確認(rèn)
- SESSION_TRANSACTED:事務(wù)提交并確認(rèn)。當(dāng)session使用事務(wù)時(shí),調(diào)用此確認(rèn)方式。在事務(wù)開(kāi)啟之后和session.commit()之前,所有消費(fèi)的消息,要么全部正常確認(rèn),要么全部redelivery。
- 當(dāng)session.commit方法異常時(shí),開(kāi)發(fā)者通常是調(diào)用session.rollback()回滾事務(wù)(事實(shí)上開(kāi)發(fā)者不調(diào)用也沒(méi)有問(wèn)題),開(kāi)發(fā)這個(gè)可以在事務(wù)開(kāi)始之后的任何時(shí)機(jī)調(diào)用rollback(),rollback意味著當(dāng)前事務(wù)的結(jié)束,事務(wù)中所有的消息都將被重發(fā)。調(diào)用session.rollback()而導(dǎo)致消息重發(fā),都會(huì)導(dǎo)致message.redeliveryCounter計(jì)數(shù)器增加,最終都會(huì)受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次數(shù)過(guò)多,而達(dá)到重發(fā)次數(shù)的上限時(shí),消息將會(huì)被DLQ(dead letter)。
2.5單條消息確認(rèn)
INDIVIDUAL_ACKNOWLEDGE : 單條消息確認(rèn)。此確認(rèn)方式與客戶端確認(rèn)方式使用CLIENT_ACKNOWLEDGE幾乎一樣,當(dāng)消息消費(fèi)成功之后,調(diào)用message.acknowledege來(lái)確認(rèn)此消息(單條),而CLIENT_ACKNOWLEDGE模式,調(diào)用message.acknowledge()方法將導(dǎo)致整個(gè)session中所有消息被確認(rèn)(批量確認(rèn))。
3.客戶端確認(rèn)使用場(chǎng)景解析
Consumer使用MessageListener異步監(jiān)聽(tīng)隊(duì)列消息,并將消息插入到數(shù)據(jù)中。
消息確認(rèn)方式為客戶端單條消息確認(rèn),消息插入數(shù)據(jù)庫(kù)成功,調(diào)用message.acknowledege()來(lái)確認(rèn)此消息(單條),消息插入數(shù)據(jù)庫(kù)失敗,調(diào)用session.recover()將消息返回的隊(duì)列中重新發(fā)送。
以下為部分代碼示例。
final InforBrokerQueueSession session = (InforBrokerQueueSession) connection.createQueueSession(Boolean.FALSE, InforBrokerSession.INDIVIDUAL_ACKNOWLEDGE); Destination destination = session.createQueue("myqueue_rz"); InforBrokerMessageConsumer consumer = (InforBrokerMessageConsumer) session.createConsumer(destination); public void onMessage(Message m) { TextMessage message = (TextMessage)m; long begin=System.currentTimeMillis();//用于性能測(cè)試,跟蹤程序運(yùn)行時(shí)間 try { System.out.println("message::"+message.getText()); state=insertDB(message.getText()); } catch (JMSException e) { e.printStackTrace(); } long end=System.currentTimeMillis(); //用于性能測(cè)試,跟蹤程序運(yùn)行時(shí)間 long cost=end-begin; System.out.println("數(shù)據(jù)接收處理總共耗時(shí):"+cost); //用于性能測(cè)試,跟蹤程序運(yùn)行時(shí)間 if(state.equals("1")){ try { // System.out.println("stat=1"+message.getText()); message.acknowledge(); // session.commit(); } catch (JMSException e) { e.printStackTrace(); } }else{ try { //session.rollback(); session.recover(); System.out.println("getJMSRedelivered():"+m.getJMSRedelivered()); m.setJMSRedelivered(true); System.out.println("m.getJMSDeliveryMode():"+m.getJMSDeliveryMode()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
到此這篇關(guān)于ActiveMQ中consumer的消息確認(rèn)機(jī)制詳解的文章就介紹到這了,更多相關(guān)consumer的消息確認(rèn)機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot基于docsify?實(shí)現(xiàn)隨身文檔
這篇文章主要介紹了springboot基于docsify實(shí)現(xiàn)隨身文檔的相關(guān)資料,需要的朋友可以參考下2022-09-09JVM的類(lèi)加載過(guò)程詳細(xì)說(shuō)明
近來(lái)讀了《深入理解JVM虛擬機(jī)》的部分內(nèi)容,對(duì)JVM也慢慢有個(gè)整體的認(rèn)識(shí),今天就來(lái)分享一下我對(duì)JVM類(lèi)加載過(guò)程的學(xué)習(xí)和理解,需要的朋友可以參考下2021-06-06java面向?qū)ο缶幊讨匾拍罾^承和多態(tài)示例解析
這篇文章主要為大家介紹了java面向?qū)ο缶幊痰膬蓚€(gè)重要概念繼承和多態(tài)示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05java Class.getSimpleName() 詳解及用法
這篇文章主要介紹了java Class.getSimpleName() 詳解及用法的相關(guān)資料,需要的朋友可以參考下2017-02-02Java類(lèi)初始化時(shí)機(jī)測(cè)試方法解析
這篇文章主要介紹了Java類(lèi)初始化時(shí)機(jī)測(cè)試過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08解讀Spring定義Bean的兩種方式:<bean>和@Bean
這篇文章主要介紹了Spring定義Bean的兩種方式:<bean>和@Bean,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04