JMS簡(jiǎn)介與ActiveMQ實(shí)戰(zhàn)代碼分享
一、異步通信
之前接觸到的RMI,Hessian等技術(shù)都是同步通信機(jī)制。當(dāng)客戶端調(diào)用遠(yuǎn)程方法時(shí),客戶端必須等到遠(yuǎn)程方法完成后,才能繼續(xù)執(zhí)行。這段時(shí)間客戶端一直會(huì)被阻塞(這樣造成的用戶體驗(yàn)很不好)。
(同步通信)
同步通信有并不是程序之間交互的唯一方式,異步通信機(jī)制中,客戶端不需要等待服務(wù)處理消息,可以繼續(xù)執(zhí)行,并且最終能夠收到并處理消息。
(異步通信)
異步通信的優(yōu)勢(shì)
無需等待。客戶端只需要將消息發(fā)送給消息代理,不需要等待就可以繼續(xù)執(zhí)行別的任務(wù),且確信消息會(huì)被投遞給相應(yīng)的目的地。
面向消息和解耦。 客戶端不需要擔(dān)心遠(yuǎn)程服務(wù)的接口規(guī)范,只需要把消息放入消息隊(duì)列然后獲取結(jié)果即可。
二、JMS
1. 簡(jiǎn)介
在JMS出現(xiàn)之前,每個(gè)消息代理都是有不同的實(shí)現(xiàn),這就使得不同代理之間的消息代碼很難通用。JMS(Java Message Service,Java消息服務(wù))是一個(gè)標(biāo)準(zhǔn),定義了使用消息代理的通用API。即所有遵從規(guī)范的實(shí)現(xiàn)都使用通用的接口,類似于JDBC為數(shù)據(jù)庫操作提供通用接口。
JMS幾個(gè)重要的要素:
Destination:消息從發(fā)送端發(fā)出后要走的通道。
ConnectionFactory:連接工廠,用于創(chuàng)建連接的對(duì)象。
Connection:連接接口,用于創(chuàng)建session。
Session:會(huì)話接口,用于創(chuàng)建消息的發(fā)送者,接受者以及消息對(duì)象本身。
MessageConsumer:消息的消費(fèi)者。
MessageProducer:消息的生產(chǎn)者。
XXXMessage:各種類型的消息對(duì)象,包括ByteMessage、MapMessage、ObjectMessage、StreamMessage和TextMessage 5種。
2. JMS消息模型
不同的消息系統(tǒng)有不同的消息模型。JMS提供了兩種模型:Queue(點(diǎn)對(duì)點(diǎn))和Topic(發(fā)布/訂閱)。
JMS Queue(點(diǎn)對(duì)點(diǎn))模型
在點(diǎn)對(duì)點(diǎn)模型中,消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息,但不可重復(fù)消費(fèi)。
如圖:
發(fā)送者1,發(fā)送者2,發(fā)送者3各發(fā)送一條消息到服務(wù)器;
消息1,2,3就會(huì)按照順序形成一個(gè)隊(duì)列,隊(duì)列中的消息不知道自己會(huì)被哪個(gè)接收者消費(fèi);
接收者1,2,3分別從隊(duì)列中取出一條消息進(jìn)行消費(fèi),每取出一條消息,隊(duì)列就會(huì)將該消息刪除,這樣即保證了消息不會(huì)被重復(fù)消費(fèi)。
JMS Queue模型也成為P2P(Point to Point)模型。
JMS Topic(發(fā)布/訂閱)模型
JMS Topic模型與JMS Queue模型的最大差別在于消息接收的部分。Topic模型類似于微信公眾號(hào),訂閱了該公眾號(hào)的接收者都可以接收到公眾號(hào)推送的消息。
如圖:
發(fā)布者1,2,3分別發(fā)布3個(gè)主題1,2,3;
這樣訂閱了主題1的用戶群:訂閱者1,2,3即能接收到主題1消息;同理訂閱者4,5,6即能接收到主題2消息,訂閱者7,8,9即能接收到主題3消息。
JMS Topic模型也成為Pus/Sub模型。
兩種模式下各要素的對(duì)比:
3. 傳統(tǒng)JMS編程模型
Producer:
(1)創(chuàng)建連接工廠ConnectionFactory;
(2) 使用連接工廠創(chuàng)建連接;
(3)啟動(dòng)連接;
(4)創(chuàng)建會(huì)話;
(5) 創(chuàng)建消息發(fā)送的目的地;
(6)創(chuàng)建生產(chǎn)者;
(7)創(chuàng)建消息類型和消息內(nèi)容;
(8)發(fā)送消息;
Consumer:
(1)創(chuàng)建連接工廠ConnectionFactory;
(2) 使用連接工廠創(chuàng)建連接;
(3)啟動(dòng)連接;
(4)創(chuàng)建會(huì)話;
(5) 創(chuàng)建消息發(fā)送的目的地;
(6)創(chuàng)建消費(fèi)者
(7)創(chuàng)建消息類型;
(8)接收消息;
三、 ActiveMQ簡(jiǎn)介
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
ActiveMQ 主要特性:
多種語言和協(xié)議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應(yīng)用協(xié)議:
OpenWire,Stomp REST,WS Notification,XMPP,AMQP
完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化,XA消息,事務(wù))
對(duì)spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
通過了常見J2EE服務(wù)器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測(cè)試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動(dòng)的部署到任何兼容J2EE 1.4 商業(yè)服務(wù)器上
支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通過JDBC和journal提供高速的消息持久化
從設(shè)計(jì)上保證了高性能的集群,客戶端-服務(wù)器,點(diǎn)對(duì)點(diǎn)
支持Ajax
支持與Axis的整合
可以很容易得調(diào)用內(nèi)嵌JMS provider,進(jìn)行測(cè)試
四、 ActiveMQ實(shí)戰(zhàn)
下面看看如何ActiveMQ實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列。
傳統(tǒng)的JMS編程模型
1. JMS Queue模型代碼實(shí)現(xiàn):
Producer:
package com.wgs.mq.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by GenshenWang.nomico on 2017/10/19. */ public class ActiveMQProducer { private static final String URL = "tcp://localhost:61616"; private static final String QUEUE_NAME = "queue-name"; public static void main(String[] args) throws JMSException { //1 創(chuàng)建連接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //2 使用連接工廠創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3 啟動(dòng)連接 connection.start(); //4 創(chuàng)建會(huì)話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 創(chuàng)建消息發(fā)送的目的地 Destination destination = session.createQueue(QUEUE_NAME); //6 創(chuàng)建生產(chǎn)者 MessageProducer messageProducer = session.createProducer(destination); //7 創(chuàng)建消息 TextMessage textMessage = session.createTextMessage(); for (int i = 1; i <= 100; i++) { //8 創(chuàng)建消息內(nèi)容 textMessage.setText("發(fā)送者- 1 -發(fā)送消息:" + i); //9 發(fā)送消息 messageProducer.send(textMessage); } System.out.println("消息發(fā)送成功"); session.close(); connection.close(); } }
Conusmer:
package com.wgs.mq.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by GenshenWang.nomico on 2017/10/19. */ public class ActiveMQConsumer { private static final String URL = "tcp://localhost:61616"; private static final String QUEUE_NAME = "queue-name"; public static void main(String[] args) throws JMSException { //1 創(chuàng)建連接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //2 使用連接工廠創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3 啟動(dòng)連接 connection.start(); //4 創(chuàng)建會(huì)話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 創(chuàng)建消息發(fā)送的目的地 Destination destination = session.createQueue(QUEUE_NAME); //6 創(chuàng)建消費(fèi)者 MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { //7 創(chuàng)建消息 TextMessage textMessage = (TextMessage)message; try { //7 接收消息 System.out.println("消費(fèi)者- 1 -接收消息:【" + textMessage.getText() + "】"); } catch (JMSException e) { e.printStackTrace(); } } } ); } }
2. JMS Topic模型代碼實(shí)現(xiàn):
Producer:
package com.wgs.mq.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 發(fā)布訂閱模式 * Created by GenshenWang.nomico on 2017/10/19. */ public class ActiveMQProducer { private static final String URL = "tcp://localhost:61616"; private static final String TOPIC_NAME = "topic-name"; public static void main(String[] args) throws JMSException { //1 創(chuàng)建連接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //2 使用連接工廠創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3 啟動(dòng)連接 connection.start(); //4 創(chuàng)建會(huì)話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 創(chuàng)建帶有主題的消息發(fā)送的目的地 Destination destination = session.createTopic(TOPIC_NAME); //6 創(chuàng)建生產(chǎn)者 MessageProducer messageProducer = session.createProducer(destination); //7 創(chuàng)建消息 TextMessage textMessage = session.createTextMessage(); for (int i = 1; i <= 100; i++) { //8 創(chuàng)建消息內(nèi)容 textMessage.setText("發(fā)送者- 1 -發(fā)送消息:" + i); //9 發(fā)送消息 messageProducer.send(textMessage); } System.out.println("消息發(fā)送成功"); session.close(); connection.close(); } }
Consumer:
package com.wgs.mq.topic; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * 發(fā)布訂閱模式 * Created by GenshenWang.nomico on 2017/10/19. */ public class ActiveMQConsumer { private static final String URL = "tcp://localhost:61616"; private static final String TOPIC_NAME = "topic-name"; public static void main(String[] args) throws JMSException { //1 創(chuàng)建連接工廠ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //2 使用連接工廠創(chuàng)建連接 Connection connection = connectionFactory.createConnection(); //3 啟動(dòng)連接 connection.start(); //4 創(chuàng)建會(huì)話 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5 創(chuàng)建消息發(fā)送的目的地 Destination destination = session.createTopic(TOPIC_NAME); //6 創(chuàng)建消費(fèi)者 MessageConsumer messageConsumer = session.createConsumer(destination); messageConsumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { //7 創(chuàng)建消息 TextMessage textMessage = (TextMessage)message; try { //7 接收消息 System.out.println("消費(fèi)者- 1 -接收消息:【" + textMessage.getText() + "】"); } catch (JMSException e) { e.printStackTrace(); } } } ); } }
使用Spring的JMS模板
雖然JMS為所有的消息代理提供了統(tǒng)一的接口,但如同JDBC一樣,在處理連接,語句,結(jié)果集和異常時(shí)會(huì)顯得很繁雜。不過,Spring為我們提供了JmsTemplate來消除冗余和重復(fù)的JMS代碼。
下面看看如何使用JmsTemplate來實(shí)現(xiàn)消息隊(duì)列。
1. JMS Queue模型代碼實(shí)現(xiàn):
配置文件:
producer.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/> <!-- ActiveMQ提供的ConnectionFactory--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- 在Spring 中配置JMS連接工廠,連接到ActiveMQ提供的ConnectionFactory--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref = "targetConnectionFactory"/> </bean> <!-- 配置JmsTemplate,用于發(fā)送消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 配置隊(duì)列目的地的名稱--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue-spring-name"/> </bean> <!-- 配置隊(duì)列目的地的名稱--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic-spring-name"/> </bean> <bean id="producerServiceImpl" class="com.wgs.jms.producer.ActiveMQProducerServiceImpl"/> </beans>
consumer.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:annotation-config/> <!-- ActiveMQ提供的ConnectionFactory--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- 在Spring 中配置JMS連接工廠,連接到ActiveMQ提供的ConnectionFactory--> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref = "targetConnectionFactory"/> </bean> <!-- 配置隊(duì)列目的地的名稱--> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queue-spring-name"/> </bean> <!-- 配置消息監(jiān)聽器--> <bean id="consumerMessageListener" class="com.wgs.jms.consumer.ConsumerMessageListener"/> <!-- 配置隊(duì)列目的地的名稱--> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="destination" ref="queueDestination"/> <property name="connectionFactory" ref="connectionFactory"/> <property name="messageListener" ref="consumerMessageListener"/> </bean> <!-- 配置隊(duì)列目的地的名稱--> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic-spring-name"/> </bean> </beans>
生產(chǎn)者Producer:
(1)先寫一個(gè)接口:
package com.wgs.jms.producer; /** * Created by GenshenWang.nomico on 2017/10/20. */ public interface ActiveMQProducerService { void sendMessage(final String message); }
(2)接口的實(shí)現(xiàn):
package com.wgs.jms.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import javax.annotation.Resource; import javax.jms.*; /** * Created by GenshenWang.nomico on 2017/10/20. */ public class ActiveMQProducerServiceImpl implements ActiveMQProducerService { @Autowired JmsTemplate jmsTemplate; @Resource(name = "queueDestination") Destination destination; public void sendMessage(final String message) { jmsTemplate.send(destination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(message); return textMessage; } } ); System.out.println("生產(chǎn)者- 1 -發(fā)送消息成功:" + message); } }
(3)測(cè)試:
package com.wgs.jms.producer; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Created by GenshenWang.nomico on 2017/10/20. */ public class ActiveMQProducerMain { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml"); ActiveMQProducerService service = context.getBean(ActiveMQProducerService.class); for (int i = 0; i < 100; i++) { service.sendMessage("test" + i); } context.close(); } }
消費(fèi)者:
(1)創(chuàng)建消息監(jiān)聽器:
package com.wgs.jms.consumer; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * Created by GenshenWang.nomico on 2017/10/20. */ public class ConsumerMessageListener implements MessageListener { public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; System.out.println("消費(fèi)者- 1 -接收消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
(2)測(cè)試:
package com.wgs.jms.consumer; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Created by GenshenWang.nomico on 2017/10/20. */ public class ActiveMQConsumerMain { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); } }
2. JMS Topic模型代碼實(shí)現(xiàn):
將上述代碼中出現(xiàn)的queueDestination改為topicDestination即可。
總結(jié)
以上就是本文關(guān)于JMS簡(jiǎn)介與ActiveMQ實(shí)戰(zhàn)代碼分享的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對(duì)本站的支持!
相關(guān)文章
使用JAXBContext輕松實(shí)現(xiàn)Java和xml的互相轉(zhuǎn)換方式
這篇文章主要介紹了依靠JAXBContext輕松實(shí)現(xiàn)Java和xml的互相轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08SpringSecurity中的Filter Chain(過濾器鏈)
Spring Security的Filter Chain是由一系列過濾器組成的管道,每個(gè)過濾器執(zhí)行特定的安全功能,Spring Security能夠提供強(qiáng)大而靈活的安全控制機(jī)制,從而保護(hù)你的應(yīng)用程序不受各種網(wǎng)絡(luò)安全威脅的侵害,本文介紹SpringSecurity中的Filter Chain,感興趣的朋友跟隨小編一起看看吧2024-06-06idea運(yùn)行vue項(xiàng)目設(shè)置自定義瀏覽器方式
這篇文章主要介紹了idea運(yùn)行vue項(xiàng)目設(shè)置自定義瀏覽器方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08java 實(shí)現(xiàn)文件復(fù)制和格式更改的實(shí)例
java 實(shí)現(xiàn)文件復(fù)制和格式更改的實(shí)例,需要的朋友可以參考一下2013-03-03SpringBoot中使用Servlet三大組件的方法(Servlet、Filter、Listener)
這篇文章主要介紹了SpringBoot中使用Servlet三大組件的方法(Servlet、Filter、Listener),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-01-01java根據(jù)List內(nèi)對(duì)象的屬性排序方法
下面小編就為大家分享一篇java根據(jù)List內(nèi)對(duì)象的屬性排序方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01springboot Actuator的指標(biāo)監(jiān)控可視化功能詳解
SpringBoot Actuator是springboot為簡(jiǎn)化我們對(duì)微服務(wù)項(xiàng)目的監(jiān)控功能抽取出來的模塊,使得我們每個(gè)微服務(wù)快速引用即可獲得生產(chǎn)界別的應(yīng)用監(jiān)控、審計(jì)等功能。這篇文章主要介紹了springboot Actuator的指標(biāo)監(jiān)控可視化,需要的朋友可以參考下2021-08-08