ActiveMQ中consumer的消息確認機制詳解
1. Consumer消息確認機制
簡單講就是消息被Consumer接收后,Consumer將在何時確認消息。
對于broker而言,只有接收到確認指令,才會認為消息被正確的接收或者處理成功了。InforSuiteMQ提供以下幾種Consumer與Broker之間的消息確認方式。
(1)AUTO_ACKNOWLEDGE = 1 自動確認
(2)CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
(3)DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
(4)SESSION_TRANSACTED = 0 事務提交并確認
(5)INDIVIDUAL_ACKNOWLEDGE = 4 單條消息確認
前四種是JMS API中提供的客戶端ACK_MODE。第五種是InforSuiteMQ自定義補充的一種ACK_MODE。
Consumer有兩種消息消費方式:同步消費consumer.receive()和異步消費MessageListener,這兩種方式下,消息確認機制也是不同的。同一Consumer中,不可同時使用這兩種消費方式。
同步調用時,在消息從receive方法返回之前,就已經調用了ACK;因此如果Client端沒有處理成功,此消息將丟失(可能重發(fā),與ACK_MODE有關)。
異步調用時,消息的確認是在onMessage方法返回之后,如果onMessage方法異常,會導致消息重發(fā)。
2.消息確認方式詳解
2.1自動確認
AUTO_ACKNOWLEDGE : 自動確認,這就意味著消息的確認時機將有consumer擇機確認。
使用開發(fā)者必須明確知道"擇機確認"的具體時機,否則將有可能導致消息的丟失,或者消息的重復接收。
2.2客戶端確認
CLIENT_ACKNOWLEDGE : 客戶端手動確認,開發(fā)者需要自己擇機確認??蛻舳耸謩哟_認時機有以下三種:
(1) message.acknowledge():確認當前session中所有consumer中尚未ACK的消息;
(2) InforSuiteMQSession.acknowledge():確認當前session中所有consumer中尚未ACK的消息;
(3) InforSuiteMQMessageConsumer.acknowledege():確認當前consumer中那些尚未確認的消息。
2.3自動批量確認
DUPS_OK_ACKNOWLEDGE : 自動批量確認,也是一種自動確實方式,使用方法與AUTO_ACKNOWLEDGE相同,具有“批量”和“延遲”的確認特點。
該模式下,當Consumer故障重啟后,那些尚未被ACK確認的消息會重新發(fā)送過來,這就意味著消息可能重復。
2.4事務確認
- SESSION_TRANSACTED:事務提交并確認。當session使用事務時,調用此確認方式。在事務開啟之后和session.commit()之前,所有消費的消息,要么全部正常確認,要么全部redelivery。
- 當session.commit方法異常時,開發(fā)者通常是調用session.rollback()回滾事務(事實上開發(fā)者不調用也沒有問題),開發(fā)這個可以在事務開始之后的任何時機調用rollback(),rollback意味著當前事務的結束,事務中所有的消息都將被重發(fā)。調用session.rollback()而導致消息重發(fā),都會導致message.redeliveryCounter計數器增加,最終都會受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次數過多,而達到重發(fā)次數的上限時,消息將會被DLQ(dead letter)。
2.5單條消息確認
INDIVIDUAL_ACKNOWLEDGE : 單條消息確認。此確認方式與客戶端確認方式使用CLIENT_ACKNOWLEDGE幾乎一樣,當消息消費成功之后,調用message.acknowledege來確認此消息(單條),而CLIENT_ACKNOWLEDGE模式,調用message.acknowledge()方法將導致整個session中所有消息被確認(批量確認)。
3.客戶端確認使用場景解析
Consumer使用MessageListener異步監(jiān)聽隊列消息,并將消息插入到數據中。
消息確認方式為客戶端單條消息確認,消息插入數據庫成功,調用message.acknowledege()來確認此消息(單條),消息插入數據庫失敗,調用session.recover()將消息返回的隊列中重新發(fā)送。
以下為部分代碼示例。
final InforBrokerQueueSession session = (InforBrokerQueueSession) connection.createQueueSession(Boolean.FALSE, InforBrokerSession.INDIVIDUAL_ACKNOWLEDGE); Destination destination = session.createQueue("myqueue_rz"); InforBrokerMessageConsumer consumer = (InforBrokerMessageConsumer) session.createConsumer(destination); public void onMessage(Message m) { TextMessage message = (TextMessage)m; long begin=System.currentTimeMillis();//用于性能測試,跟蹤程序運行時間 try { System.out.println("message::"+message.getText()); state=insertDB(message.getText()); } catch (JMSException e) { e.printStackTrace(); } long end=System.currentTimeMillis(); //用于性能測試,跟蹤程序運行時間 long cost=end-begin; System.out.println("數據接收處理總共耗時:"+cost); //用于性能測試,跟蹤程序運行時間 if(state.equals("1")){ try { // System.out.println("stat=1"+message.getText()); message.acknowledge(); // session.commit(); } catch (JMSException e) { e.printStackTrace(); } }else{ try { //session.rollback(); session.recover(); System.out.println("getJMSRedelivered():"+m.getJMSRedelivered()); m.setJMSRedelivered(true); System.out.println("m.getJMSDeliveryMode():"+m.getJMSDeliveryMode()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } }
到此這篇關于ActiveMQ中consumer的消息確認機制詳解的文章就介紹到這了,更多相關consumer的消息確認機制內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java Class.getSimpleName() 詳解及用法
這篇文章主要介紹了java Class.getSimpleName() 詳解及用法的相關資料,需要的朋友可以參考下2017-02-02解讀Spring定義Bean的兩種方式:<bean>和@Bean
這篇文章主要介紹了Spring定義Bean的兩種方式:<bean>和@Bean,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04