欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

消息中間件ActiveMQ的簡單入門介紹與使用

 更新時間:2021年11月25日 11:05:42   作者:蘋果大大個  
消息隊列是指利用高效可靠的消息傳遞機(jī)制進(jìn)行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成,這篇文章主要給大家介紹了關(guān)于ActiveMQ的簡單入門介與使用的相關(guān)資料,需要的朋友可以參考下

一、什么是消息中間件

消息中間件顧名思義實現(xiàn)的就是在兩個系統(tǒng)或兩個客戶端之間進(jìn)行消息傳送

二、什么是ActiveMQ

ActiveMQ是一種開源的基于JMS(Java Message Servie)規(guī)范的一種消息中間件的實現(xiàn),ActiveMQ的設(shè)計目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語言和多系統(tǒng)的應(yīng)用集成消息通信中間件。

三、什么時候需要用ActiveMQ

ActiveMQ常被應(yīng)用與系統(tǒng)業(yè)務(wù)的解耦,異步消息的推送,增加系統(tǒng)并發(fā)量,提高用戶體驗。例如以我在工作中的使用,在比較耗時且異步的遠(yuǎn)程開鎖操作時

四、如何使用ActiveMQ

1.AcitveMQ的數(shù)據(jù)傳送流程

2.ActiveMQ的兩種消息傳遞類型

(1)點對點傳輸,即一個生產(chǎn)者對應(yīng)一個消費(fèi)者,生產(chǎn)者向broke推送數(shù)據(jù),數(shù)據(jù)存儲在broke的一個隊列中,當(dāng)消費(fèi)者接受該條隊列里的數(shù)據(jù)。

(2)基于發(fā)布/訂閱模式的傳輸,即根據(jù)訂閱話題來接收相應(yīng)數(shù)據(jù),一個生產(chǎn)者可向多個消費(fèi)者推送數(shù)據(jù),與MQTT協(xié)議的實現(xiàn)是類似的,對MQTT協(xié)議有興趣的可跳轉(zhuǎn)文末查看

兩種消息傳遞類型的不同,點對點傳輸消費(fèi)者可以接收到在連接之前生產(chǎn)者所推送的數(shù)據(jù),而基于發(fā)布/訂閱模式的傳輸方式消費(fèi)者只能接收到連接之后生產(chǎn)者推送的數(shù)據(jù)。

3.ActiveMQ的安裝與啟動

(1)官網(wǎng)下載對應(yīng)服務(wù)器版本

(2)解壓后進(jìn)入apache-activemq-5.15.9/bin目錄

(3)執(zhí)行./activemq start啟動ActiveMQ

(4)瀏覽器輸入ActiveMQ啟動的服務(wù)器ip:8161便可進(jìn)入web界面,點擊Manage ActiveMQ broker可以查看消息推送的狀態(tài),默認(rèn)賬號密碼為admin,admin

(5)啟動錯誤分析

進(jìn)入/root/apache-activemq-5.15.9/data目錄查看activemq.log文件,根據(jù)錯誤提示信息修改,例如端口號被占用等。

4.ActiveMQ的代碼測試

(1)構(gòu)建maven項目,引入依賴

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

(2)生產(chǎn)者類

/**
 * @Description 生產(chǎn)者
 * @Date 2019/7/20
 * @Created by yqh
 */
public class MyProducer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊列目標(biāo),并標(biāo)識隊列名稱,消費(fèi)者根據(jù)隊列名稱接收數(shù)據(jù)
        Destination destination = session.createQueue("myQueue");
        // 創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        // 向隊列推送10個文本消息數(shù)據(jù)
        for (int i = 1 ; i <= 10 ; i++){
            // 創(chuàng)建文本消息
            TextMessage message = session.createTextMessage("第" + i + "個文本消息");
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已發(fā)送的消息:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }

}

運(yùn)行結(jié)果:

已發(fā)送的消息:第1個文本消息

已發(fā)送的消息:第2個文本消息

已發(fā)送的消息:第3個文本消息

已發(fā)送的消息:第4個文本消息

已發(fā)送的消息:第5個文本消息

已發(fā)送的消息:第6個文本消息

已發(fā)送的消息:第7個文本消息

已發(fā)送的消息:第8個文本消息

已發(fā)送的消息:第9個文本消息

已發(fā)送的消息:第10個文本消息

測試查看web后臺顯示,有10條消息在隊列中等待消費(fèi)

(3)消費(fèi)者類

/**
 * @Description 消費(fèi)者類
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyConsumer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊列目標(biāo),并標(biāo)識隊列名稱,消費(fèi)者根據(jù)隊列名稱接收數(shù)據(jù)
        Destination destination = session.createQueue("myQueue");
        // 創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(destination);
        // 創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消費(fèi)的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

測試結(jié)果:

消費(fèi)的消息:第1個文本消息

消費(fèi)的消息:第2個文本消息

消費(fèi)的消息:第3個文本消息

消費(fèi)的消息:第4個文本消息

消費(fèi)的消息:第5個文本消息

消費(fèi)的消息:第6個文本消息

消費(fèi)的消息:第7個文本消息

消費(fèi)的消息:第8個文本消息

消費(fèi)的消息:第9個文本消息

消費(fèi)的消息:第10個文本消息

web后臺顯示有一個消費(fèi)者處于連接狀態(tài),且已消費(fèi)了10個message,而該條隊列已沒有message待消費(fèi)了

(4)當(dāng)我們運(yùn)行兩個消費(fèi)者類,消息又是怎么被消費(fèi)的呢?是兩個消費(fèi)者都能收到生產(chǎn)者生產(chǎn)的message,還是只有其中一個消費(fèi)者能消費(fèi)呢?

我們先運(yùn)行兩個消費(fèi)者,在運(yùn)行一個生產(chǎn)者對目標(biāo)隊列生產(chǎn)10個message,會發(fā)現(xiàn)有以下情況

// Consumer1控制臺

消費(fèi)的消息:第1個文本消息

消費(fèi)的消息:第3個文本消息

消費(fèi)的消息:第5個文本消息

消費(fèi)的消息:第7個文本消息

消費(fèi)的消息:第9個文本消息

// Consumer2控制臺

消費(fèi)的消息:第2個文本消息

消費(fèi)的消息:第4個文本消息

消費(fèi)的消息:第6個文本消息

消費(fèi)的消息:第8個文本消息

消費(fèi)的消息:第10個文本消息

即隊列中的數(shù)據(jù)會平均的分給每一個消費(fèi)者消費(fèi),且每一條數(shù)據(jù)只能被消費(fèi)一次

(5)以上是基于隊列點對點的傳輸類型,以下是基于發(fā)布/訂閱模式傳輸?shù)念愋蜏y試

/**
 * @Description 基于發(fā)布/訂閱模式傳輸類型的生產(chǎn)者測試
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyProducerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊列目標(biāo),并標(biāo)識隊列名稱,消費(fèi)者根據(jù)隊列名稱接收數(shù)據(jù)
        Destination destination = session.createTopic("topicTest");
        // 創(chuàng)建一個生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        // 向隊列推送10個文本消息數(shù)據(jù)
        for (int i = 1 ; i <= 10 ; i++){
            // 創(chuàng)建文本消息
            TextMessage message = session.createTextMessage("第" + i + "個文本消息");
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已發(fā)送的消息:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }

}
/**
 * @Description 基于發(fā)布/訂閱模式傳輸類型的消費(fèi)者測試
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyConsumerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊列目標(biāo),并標(biāo)識隊列名稱,消費(fèi)者根據(jù)隊列名稱接收數(shù)據(jù)
        Destination destination = session.createTopic("topicTest");
        // 創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(destination);
        // 創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消費(fèi)的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

現(xiàn)在如果我們先啟動生產(chǎn)者,再啟動消費(fèi)者,會發(fā)現(xiàn)消費(fèi)者是無法接收到之前生產(chǎn)者之前所生產(chǎn)的數(shù)據(jù),只有消費(fèi)者先啟動,再讓生產(chǎn)者消費(fèi)才可以正常接收數(shù)據(jù),這也是發(fā)布/訂閱的主題模式與點對點的隊列模式的一個明顯區(qū)別。

而如果啟動兩個消費(fèi)者,那么每一個消費(fèi)者都能完整的接收到生產(chǎn)者生產(chǎn)的數(shù)據(jù),即每一條數(shù)據(jù)都被消費(fèi)了兩次,這是發(fā)布/訂閱的主題模式與點對點的隊列模式的另一個明顯區(qū)別。

淺談MQTT

1、什么是MQTT

MQTT的全稱是“ Message Queuing Telemetry Transport”,即消息隊列遙測傳輸,是一種基于訂閱/發(fā)布模式的應(yīng)用層協(xié)議,而http是一種基于restful風(fēng)格的一種應(yīng)用層協(xié)議。

MQTT協(xié)議是一種輕量級協(xié)議,作為一種低開銷、低帶寬占用的即時通訊協(xié)議,常被應(yīng)用于物聯(lián)網(wǎng)項目。同樣基于訂閱/發(fā)布模式的中間件有ActiveMQ,Kafka等消息中間件,歸根結(jié)底實現(xiàn)的都是消息的傳輸。

2、如何理解MQTT

MQTT的是一種應(yīng)用層協(xié)議,每一種協(xié)議都有其適用場景,而MQTT常被應(yīng)用于消息推送,消息采集。例如溫度檢測儀器定時上傳溫度、檢測礦洞氧氣濃度等。

MQTT是基于TCP/IP的一種應(yīng)用層協(xié)議,TCP/IP本身已實現(xiàn)了在不可靠的網(wǎng)絡(luò)環(huán)境提供可靠的網(wǎng)絡(luò)傳輸?shù)墓δ?,而MQTT協(xié)議也有其保障消息可靠傳輸?shù)牟呗浴?/p>

MQTT推送的消息有三種消息質(zhì)量

1.至多一次,即消息只推送一次,至于消息有沒有推送成功

2.至少一次,需要確認(rèn)消息到達(dá),可能會導(dǎo)致收到重復(fù)數(shù)據(jù)(注:MQTT定義的重發(fā)機(jī)制與tcp的重復(fù)機(jī)制是不同的,tcp的重復(fù)機(jī)制是在限定時間內(nèi)如果沒有收到對應(yīng)序號的響應(yīng)報文,則會重新推送該序列號對應(yīng)的報文,而MQTT的重發(fā)機(jī)制是在客戶端重新建立連接時,

補(bǔ)發(fā)之前沒有對應(yīng)響應(yīng)報文的數(shù)據(jù)包,當(dāng)然客戶端可以選擇是否要接收這些之前沒有傳輸成功的數(shù)據(jù)包。最開始使用netty實現(xiàn)MQTT服務(wù)器的時候就理解錯了,以為MQTT的重復(fù)機(jī)制與tcp的重復(fù)機(jī)制一樣)

3.只有一次,確認(rèn)消息只到達(dá)一次,常用于對數(shù)據(jù)要求嚴(yán)格的場景,例如計費(fèi)場景,訂單場景

3、如何使用MQTT

MQTT的客戶端和服務(wù)端目前已有成熟的開源產(chǎn)品,例如服務(wù)端有emqx,客戶端有Eclipse Paho Mqtt(Java),都可以方面的引入相應(yīng)的庫快速的實現(xiàn)推送功能(具體可根據(jù)需求查看對應(yīng)的API)。

本質(zhì)上來將是客戶端與服務(wù)端建立一個Socket,然后根據(jù)MQTT協(xié)議規(guī)定發(fā)送響應(yīng)的報,例如建立socket后發(fā)送connet報文去建立連接,然后服務(wù)器會解析該連接報文,并保存該連接的相關(guān)信息。

我們可以把MQTT協(xié)議的規(guī)定當(dāng)成是我們實現(xiàn)web項目中所實現(xiàn)的業(yè)務(wù)邏輯。

4、MQTT協(xié)議的相關(guān)的名詞解析

1.訂閱(Subscription)

訂閱包含主題篩選器(Topic Filter)和最大服務(wù)質(zhì)量(QoS)。訂閱會與一個會話(Session)關(guān)聯(lián)。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。

2.、會話(Session)

每個客戶端與服務(wù)器建立連接后就是一個會話,客戶端和服務(wù)器之間有狀態(tài)交互。會話存在于一個網(wǎng)絡(luò)之間,也可能在客戶端和服務(wù)器之間跨越多個連續(xù)的網(wǎng)絡(luò)連接。

3.主題名(Topic Name)

連接到一個應(yīng)用程序消息的標(biāo)簽,該標(biāo)簽與服務(wù)器的訂閱相匹配。服務(wù)器會將消息發(fā)送給訂閱所匹配標(biāo)簽的每個客戶端。

4.主題篩選器(Topic Filter)

一個對主題名通配符篩選器,在訂閱表達(dá)式中使用,表示訂閱所匹配到的多個主題。

5.負(fù)載(Payload)

消息訂閱者所具體接收的內(nèi)容。

總結(jié)

到此這篇關(guān)于ActiveMQ的簡單入門介紹與使用的文章就介紹到這了,更多相關(guān)ActiveMQ介紹與使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java高級之HashMap中的entrySet()方法使用

    Java高級之HashMap中的entrySet()方法使用

    這篇文章主要介紹了Java高級之HashMap中的entrySet()方法使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-03-03
  • Spring類型轉(zhuǎn)換 ConversionSerivce Convertor解析

    Spring類型轉(zhuǎn)換 ConversionSerivce Convertor解析

    這篇文章主要介紹了Spring類型轉(zhuǎn)換 ConversionSerivce Convertor的相關(guān)資料,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2016-11-11
  • Java給實體每一個字段賦默認(rèn)值詳細(xì)代碼示例

    Java給實體每一個字段賦默認(rèn)值詳細(xì)代碼示例

    這篇文章主要給大家介紹了關(guān)于Java給實體每一個字段賦默認(rèn)值的相關(guān)資料,在編程過程中有時會出現(xiàn)這樣一種情況,在查詢無結(jié)果時我們需要給實體賦默認(rèn)值,需要的朋友可以參考下
    2023-09-09
  • Linux環(huán)境下的Java(JDBC)連接openGauss數(shù)據(jù)庫實踐記錄

    Linux環(huán)境下的Java(JDBC)連接openGauss數(shù)據(jù)庫實踐記錄

    這篇文章主要介紹了Linux環(huán)境下的Java(JDBC)連接openGauss數(shù)據(jù)庫實踐記錄,需要的朋友可以參考下
    2022-11-11
  • 解析SpringBoot整合SpringDataRedis的過程

    解析SpringBoot整合SpringDataRedis的過程

    這篇文章主要介紹了SpringBoot整合SpringDataRedis的過程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-06-06
  • 并發(fā)編程之Java內(nèi)存模型volatile的內(nèi)存語義

    并發(fā)編程之Java內(nèi)存模型volatile的內(nèi)存語義

    這篇文章主要介紹了并發(fā)編程之Java內(nèi)存模型volatile的內(nèi)存語義,理解volatile特性的一個好辦法是把對volatile變量的單個讀/寫,看成是使用同一個鎖對單個讀/寫操作做了同步。下面我們一起進(jìn)入文章看看具體例子吧,需要的小伙伴可以參考下
    2021-11-11
  • Java集合之LinkedHashSet集合詳解

    Java集合之LinkedHashSet集合詳解

    這篇文章主要介紹了Java集合之LinkedHashSet集合詳解,具有可預(yù)知迭代順序的Set接口的哈希表和鏈表列表實現(xiàn),此實現(xiàn)與HashSet不同的是,后者維護(hù)著一個運(yùn)行于所有條目的雙重鏈表列表,此鏈表定義了迭代順序,需要的朋友可以參考下
    2023-09-09
  • Idea創(chuàng)建多模塊maven聚合項目的實現(xiàn)

    Idea創(chuàng)建多模塊maven聚合項目的實現(xiàn)

    這篇文章主要介紹了Idea創(chuàng)建多模塊maven聚合項目的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • Java超詳細(xì)分析@Autowired原理

    Java超詳細(xì)分析@Autowired原理

    @Autowired注解可以用在類屬性,構(gòu)造函數(shù),setter方法和函數(shù)參數(shù)上,該注解可以準(zhǔn)確地控制bean在何處如何自動裝配的過程。在默認(rèn)情況下,該注解是類型驅(qū)動的注入
    2022-06-06
  • SpringBoot使用JavaMailSender實現(xiàn)發(fā)送郵件+Excel附件

    SpringBoot使用JavaMailSender實現(xiàn)發(fā)送郵件+Excel附件

    項目審批完畢后,需要發(fā)送郵件通知相關(guān)人員,并且要附帶數(shù)據(jù)庫表生成的Excel表格,這就要求不光是郵件發(fā)送功能,還要臨時生成Excel表格做為附件,本文詳細(xì)介紹了SpringBoot如何使用JavaMailSender實現(xiàn)發(fā)送郵件+Excel附件,需要的朋友可以參考下
    2023-10-10

最新評論