欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ順序消息的原理與特點(diǎn)

 更新時(shí)間:2023年02月13日 14:00:45   作者:每天都要進(jìn)步一點(diǎn)點(diǎn)  
RocketMQ作為一款純java、分布式、隊(duì)列模型的開(kāi)源消息中間件,支持事務(wù)消息、順序消息、批量消息、定時(shí)消息、消息回溯等,本篇我們了解如何實(shí)現(xiàn)順序消息的原理與特點(diǎn)

一、什么是順序消息

消息有序指的是,消費(fèi)者端消費(fèi)消息時(shí),需按照消息的發(fā)送順序來(lái)消費(fèi),即先發(fā)送的消息,需要先消費(fèi)(FIFO)。

舉個(gè)容易理解的例子:通常創(chuàng)建訂單后,會(huì)經(jīng)歷一系列的操作:【訂單創(chuàng)建 -> 訂單支付 -> 訂單發(fā)貨 -> 訂單配送 -> 訂單完成】。在創(chuàng)建完訂單后,會(huì)發(fā)送五條消息到MQ Broker中,消費(fèi)的時(shí)候要按照【訂單創(chuàng)建 -> 訂單支付 -> 訂單發(fā)貨 -> 訂單配送 -> 訂單完成】這個(gè)順序去消費(fèi),這樣的訂單才是有效的。

RocketMQ采用局部順序一致性的機(jī)制,實(shí)現(xiàn)了單個(gè)隊(duì)列中消息的有序性,使用FIFO順序提供有序消息。簡(jiǎn)而言之,我們的消息要保證有序,就必須把一組消息存放在同一個(gè)隊(duì)列,然后由Consumer進(jìn)行逐一消費(fèi)。

但是如果碰到高并發(fā)的情況,消息不就會(huì)阻塞了嗎?

RocketMQ給的解決方案是按照業(yè)務(wù)去劃分不同的隊(duì)列,然后并行消費(fèi),提高消息的處理速度的同時(shí)避免消息堆積。

RocketMQ可以嚴(yán)格的保證消息有序,可以分為分區(qū)有序或者全局有序。

  • 全局有序:全局順序時(shí)使用一個(gè)queue;
  • 分區(qū)有序:局部順序時(shí)多個(gè)queue并行消費(fèi);

二、順序消息的原理

在默認(rèn)的情況下,消息發(fā)送會(huì)采取Round Robin輪詢方式把消息發(fā)送到不同的queue;而消費(fèi)消息的時(shí)候從多個(gè)queue上拉取消息,這種情況發(fā)送和消費(fèi)是不能保證順序的。但是如果控制發(fā)送的順序消息只依次發(fā)送到同一個(gè)queue中,消費(fèi)的時(shí)候只從這個(gè)queue上依次拉取,則就保證了順序。當(dāng)發(fā)送和消費(fèi)參與的queue只有一個(gè),則是全局有序;如果多個(gè)queue參與,則為分區(qū)有序,即相對(duì)每個(gè)queue,消息都是有序的。

三、全局順序消息

前面介紹到,全局順序消息的話,我們需要將所有消息都發(fā)送到同一個(gè)隊(duì)列,然后消費(fèi)者端也訂閱同一個(gè)隊(duì)列,這樣就能實(shí)現(xiàn)順序消費(fèi)消息的功能。下面通過(guò)一個(gè)示例說(shuō)明如何實(shí)現(xiàn)全局順序消息。

(1)、生產(chǎn)者發(fā)送消息

public class OrderMQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {
        // 創(chuàng)建DefaultMQProducer類(lèi)并設(shè)定生產(chǎn)者名稱(chēng)
        DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開(kāi)
        mqProducer.setNamesrvAddr("10.0.90.86:9876");
        // 啟動(dòng)消息生產(chǎn)者
        mqProducer.start();
        for (int i = 0; i < 5; i++) {
            // 創(chuàng)建消息,并指定Topic(主題),Tag(標(biāo)簽)和消息內(nèi)容
            Message message = new Message("GLOBAL_ORDER_TOPIC", "", ("全局有序消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 實(shí)現(xiàn)MessageQueueSelector,重寫(xiě)select方法,保證消息都進(jìn)入同一個(gè)隊(duì)列
            // send方法的第一個(gè)參數(shù): 需要發(fā)送的消息Message
            // send方法的第二個(gè)參數(shù): 消息隊(duì)列選擇器MessageQueueSelector
            // send方法的第三個(gè)參數(shù): 消息將要進(jìn)入的隊(duì)列下標(biāo),這里我們指定消息都發(fā)送到下標(biāo)為1的隊(duì)列
            SendResult sendResult = mqProducer.send(message, new MessageQueueSelector() {
                @Override
                // select方法第一個(gè)參數(shù): 指該Topic下有的隊(duì)列集合
                // 第二個(gè)參數(shù): 發(fā)送的消息
                // 第三個(gè)參數(shù): 消息將要進(jìn)入的隊(duì)列下標(biāo),它與send方法的第三個(gè)參數(shù)相同
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    return mqs.get((Integer) arg);
                }
            }, 1);
            System.out.println("sendResult = " + sendResult);
        }
        // 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例
        mqProducer.shutdown();
    }
}

(2)、消費(fèi)者消費(fèi)消息

public class OrderMQConsumer {
    public static void main(String[] args) throws MQClientException {
        // 創(chuàng)建DefaultMQPushConsumer類(lèi)并設(shè)定消費(fèi)者名稱(chēng)
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開(kāi)
        mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");
        // 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開(kāi)始消費(fèi)還是隊(duì)列尾部開(kāi)始消費(fèi)
        // 如果不是第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
        mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱一個(gè)或者多個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用*
        mqPushConsumer.subscribe("GLOBAL_ORDER_TOPIC", "*");
        /**
         * 與普通消費(fèi)一樣需要注冊(cè)消息監(jiān)聽(tīng)器,但是傳入的不再是MessageListenerConcurrently
         * 而是需要傳入MessageListenerOrderly的實(shí)現(xiàn)子類(lèi),并重寫(xiě)consumeMessage方法。
         */
        // 順序消費(fèi)同一個(gè)隊(duì)列的消息
        mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(false);
                for (MessageExt msg : msgs) {
                    System.out.println("消費(fèi)線程=" + Thread.currentThread().getName() +
                            ", queueId=" + msg.getQueueId() + ", 消息內(nèi)容:" + new String(msg.getBody()));
                }
                // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 啟動(dòng)消費(fèi)者實(shí)例
        mqPushConsumer.start();
    }
}

(3)、啟動(dòng)生產(chǎn)者

如下,可看到,消息成功發(fā)送到Broker中,并且可以看到,5條消息選擇的queueId都是1。

sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71A70000, offsetMsgId=0A005A5600002A9F00000000000076AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=0]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D20001, offsetMsgId=0A005A5600002A9F000000000000776B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=1]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71D80002, offsetMsgId=0A005A5600002A9F000000000000782B, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=2]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71DE0003, offsetMsgId=0A005A5600002A9F00000000000078EB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=3]
sendResult = SendResult [sendStatus=SEND_OK, msgId=AC6E009213C818B4AAC28EAB71E60004, offsetMsgId=0A005A5600002A9F00000000000079AB, messageQueue=MessageQueue [topic=GLOBAL_ORDER_TOPIC, brokerName=broker-a, queueId=1], queueOffset=4]

(4)、啟動(dòng)消費(fèi)者

如下,可看到,消費(fèi)者也是按照發(fā)送消息的順序消費(fèi)消息。

消費(fèi)線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息0
消費(fèi)線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息1
消費(fèi)線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息2
消費(fèi)線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息3
消費(fèi)線程=ConsumeMessageThread_1, queueId=1, 消息內(nèi)容:全局有序消息4

四、局部順序消息

下面用訂單進(jìn)行分區(qū)有序的示例。一個(gè)訂單創(chuàng)建完成后,訂單的狀態(tài)流轉(zhuǎn)大概是:【訂單創(chuàng)建 -> 訂單支付 -> 訂單完成】,我們?cè)趧?chuàng)建MessageQueueSelector消息隊(duì)列選擇器的時(shí)候,需要根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)自定義隊(duì)列選擇算法,如本例中則可以使用orderId訂單號(hào)去選擇隊(duì)列。這樣的話,訂單號(hào)相同的消息會(huì)被先后發(fā)送到同一個(gè)隊(duì)列中,消費(fèi)時(shí),同一個(gè)OrderId獲取到的肯定是同一個(gè)隊(duì)列。

大體過(guò)程如下圖:

(1)、生產(chǎn)者發(fā)送消息

public class OrderMQProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException, ExecutionException {
        // 創(chuàng)建DefaultMQProducer類(lèi)并設(shè)定生產(chǎn)者名稱(chēng)
        DefaultMQProducer mqProducer = new DefaultMQProducer("producer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開(kāi)
        mqProducer.setNamesrvAddr("10.0.90.86:9876");
        // 啟動(dòng)消息生產(chǎn)者
        mqProducer.start();
        List<Order> orderList = getOrderList();
        for (int i = 0; i < orderList.size(); i++) {
            String body = "【" + orderList.get(i) + "】訂單狀態(tài)變更消息";
            // 創(chuàng)建消息,并指定Topic(主題),Tag(標(biāo)簽)和消息內(nèi)容
            Message msg = new Message("ORDER_STATUS_CHANGE", "", body.getBytes(RemotingHelper.DEFAULT_CHARSET));
            // MessageQueueSelector: 消息隊(duì)列選擇器,根據(jù)業(yè)務(wù)唯一標(biāo)識(shí)自定義隊(duì)列選擇算法
            /**
             * msg:消息對(duì)象
             * selector:消息隊(duì)列的選擇器
             * arg:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(shí),如本例中的orderId
             */
            SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
                /**
                 * @param mqs 隊(duì)列集合
                 * @param msg 消息對(duì)象
                 * @param arg 業(yè)務(wù)標(biāo)識(shí)的參數(shù),對(duì)應(yīng)send()方法傳入的第三個(gè)參數(shù)arg
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //根據(jù)arg(實(shí)際上是訂單id)選擇消息發(fā)送的隊(duì)列
                    long index = (Long) arg % mqs.size();
                    return mqs.get((int) index);
                }
                //mqProducer.send()方法第三個(gè)參數(shù), 會(huì)傳遞到select()方法的arg參數(shù)
            }, orderList.get(i).getOrderId());
            System.out.println(String.format("消息發(fā)送狀態(tài):%s, orderId:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    orderList.get(i).getOrderId(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        }
        // 如果不再發(fā)送消息,關(guān)閉Producer實(shí)例
        mqProducer.shutdown();
    }
    /**
     * 訂單狀態(tài)變更流程: ORDER_CREATE(訂單創(chuàng)建) -> ORDER_PAYED(訂單已支付) -> ORDER_COMPLETE(訂單完成)
     */
    public static List<Order> getOrderList() {
        List<Order> orderList = new ArrayList<>();
        Order orderDemo = new Order();
        orderDemo.setOrderId(1L);
        orderDemo.setOrderStatus("ORDER_CREATE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(2L);
        orderDemo.setOrderStatus("ORDER_CREATE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(1L);
        orderDemo.setOrderStatus("ORDER_PAYED");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(2L);
        orderDemo.setOrderStatus("ORDER_PAYED");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(2L);
        orderDemo.setOrderStatus("ORDER_COMPLETE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(3L);
        orderDemo.setOrderStatus("ORDER_CREATE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(4L);
        orderDemo.setOrderStatus("ORDER_CREATE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(3L);
        orderDemo.setOrderStatus("ORDER_PAYED");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(1L);
        orderDemo.setOrderStatus("ORDER_COMPLETE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(3L);
        orderDemo.setOrderStatus("ORDER_COMPLETE");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(4L);
        orderDemo.setOrderStatus("ORDER_PAYED");
        orderList.add(orderDemo);
        orderDemo = new Order();
        orderDemo.setOrderId(4L);
        orderDemo.setOrderStatus("ORDER_COMPLETE");
        orderList.add(orderDemo);
        return orderList;
    }
}
public class Order implements Serializable {
    /**
     * 訂單ID
     */
    private Long orderId;
    /**
     * 訂單狀態(tài)
     */
    private String orderStatus;
    public Long getOrderId() {
        return orderId;
    }
    public void setOrderId(Long orderId) {
        this.orderId = orderId;
    }
    public String getOrderStatus() {
        return orderStatus;
    }
    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }
    @Override
    public String toString() {
        return "Order{" +
                "orderId=" + orderId +
                ", orderStatus='" + orderStatus + '\'' +
                '}';
    }
}

(2)、消費(fèi)者消費(fèi)消息

public class OrderMQConsumer {
    public static void main(String[] args) throws MQClientException {
        // 創(chuàng)建DefaultMQPushConsumer類(lèi)并設(shè)定消費(fèi)者名稱(chēng)
        DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("consumer-group-test");
        // 設(shè)置NameServer地址,如果是集群的話,使用分號(hào);分隔開(kāi)
        mqPushConsumer.setNamesrvAddr("10.0.90.86:9876");
        // 設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開(kāi)始消費(fèi)還是隊(duì)列尾部開(kāi)始消費(fèi)
        // 如果不是第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)
        mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 訂閱一個(gè)或者多個(gè)Topic,以及Tag來(lái)過(guò)濾需要消費(fèi)的消息,如果訂閱該主題下的所有tag,則使用*
        mqPushConsumer.subscribe("ORDER_STATUS_CHANGE", "*");
        // 注冊(cè)回調(diào)實(shí)現(xiàn)類(lèi)來(lái)處理從broker拉取回來(lái)的消息
        // 注意:順序消息注冊(cè)的是MessageListenerOrderly監(jiān)聽(tīng)器
        mqPushConsumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext consumeOrderlyContext) {
                consumeOrderlyContext.setAutoCommit(true);
                for (MessageExt msg : msgList) {
                    // 每個(gè)queue有唯一的consume線程來(lái)消費(fèi), 訂單對(duì)每個(gè)queue都是分區(qū)有序
                    System.out.println("消費(fèi)線程=" + Thread.currentThread().getName() +
                            ", queueId=" + msg.getQueueId() + ", 消息內(nèi)容:" + new String(msg.getBody()));
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 啟動(dòng)消費(fèi)者實(shí)例
        mqPushConsumer.start();
    }
}

(3)、啟動(dòng)生產(chǎn)者

消息發(fā)送狀態(tài):SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:2, queueId:2, body:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:1, queueId:1, body:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:3, queueId:3, body:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消息發(fā)送狀態(tài):SEND_OK, orderId:4, queueId:0, body:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息

(4)、啟動(dòng)消費(fèi)者

消費(fèi)線程=ConsumeMessageThread_2, queueId=1, 消息內(nèi)容:【Order{orderId=1, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_1, queueId=2, 消息內(nèi)容:【Order{orderId=2, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_3, queueId=3, 消息內(nèi)容:【Order{orderId=3, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_4, queueId=0, 消息內(nèi)容:【Order{orderId=4, orderStatus='ORDER_CREATE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_2, queueId=1, 消息內(nèi)容:【Order{orderId=1, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_1, queueId=2, 消息內(nèi)容:【Order{orderId=2, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_3, queueId=3, 消息內(nèi)容:【Order{orderId=3, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_4, queueId=0, 消息內(nèi)容:【Order{orderId=4, orderStatus='ORDER_PAYED'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_1, queueId=2, 消息內(nèi)容:【Order{orderId=2, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_3, queueId=3, 消息內(nèi)容:【Order{orderId=3, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_2, queueId=1, 消息內(nèi)容:【Order{orderId=1, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息
消費(fèi)線程=ConsumeMessageThread_4, queueId=0, 消息內(nèi)容:【Order{orderId=4, orderStatus='ORDER_COMPLETE'}】訂單狀態(tài)變更消息

從上面的結(jié)果,我們可以看出來(lái)實(shí)現(xiàn)了分區(qū)有序,即一個(gè)線程只完成唯一標(biāo)識(shí)的訂單消息。

五、順序消息缺陷

  • 消費(fèi)順序消息的并行度依賴于隊(duì)列的數(shù)量 ;
  • 隊(duì)列熱點(diǎn)問(wèn)題,個(gè)別隊(duì)列由于哈希不均導(dǎo)致消息過(guò)多,消費(fèi)速度跟不上,產(chǎn)生消息堆積問(wèn)題;
  • 遇到消息失敗的消息,無(wú)法跳過(guò),當(dāng)前隊(duì)列消費(fèi)暫停;

到此這篇關(guān)于RocketMQ順序消息的原理與特點(diǎn)的文章就介紹到這了,更多相關(guān)RocketMQ順序消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 解決外部jar包@Service無(wú)法注解無(wú)法掃描的問(wèn)題

    解決外部jar包@Service無(wú)法注解無(wú)法掃描的問(wèn)題

    這篇文章主要介紹了解決外部jar包@Service無(wú)法注解無(wú)法掃描的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java的CopyOnWriteArrayList操作詳解

    Java的CopyOnWriteArrayList操作詳解

    這篇文章主要介紹了Java的CopyOnWriteArrayList操作詳解,  CopyOnWriteArrayList是ArrayList 的一個(gè)線程安全的變體,其中所有可變操作(add、set等等)都是通過(guò)對(duì)底層數(shù)組進(jìn)行一次新的復(fù)制來(lái)實(shí)現(xiàn)的,需要的朋友可以參考下
    2023-12-12
  • Spring的事件和監(jiān)聽(tīng)器-同步與異步詳解

    Spring的事件和監(jiān)聽(tīng)器-同步與異步詳解

    這篇文章主要介紹了Spring的事件和監(jiān)聽(tīng)器-同步與異步詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • 一篇文章帶你了解JavaSE的數(shù)據(jù)類(lèi)型

    一篇文章帶你了解JavaSE的數(shù)據(jù)類(lèi)型

    這篇文章主要給大家介紹了關(guān)于JavaSE的數(shù)據(jù)類(lèi)型,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-09-09
  • Java中基于Nacos實(shí)現(xiàn)Sentinel規(guī)則持久化詳解

    Java中基于Nacos實(shí)現(xiàn)Sentinel規(guī)則持久化詳解

    這篇文章主要介紹了Java中基于Nacos實(shí)現(xiàn)Sentinel規(guī)則持久化詳解,Sentinel Dashboard中添加的規(guī)則數(shù)據(jù)存儲(chǔ)在內(nèi)存,微服務(wù)停掉規(guī)則數(shù)據(jù)就消失,在?產(chǎn)環(huán)境下不合適,我們可以將Sentinel規(guī)則數(shù)據(jù)持久化到Nacos配置中?,讓微服務(wù)從Nacos獲取規(guī)則數(shù)據(jù),需要的朋友可以參考下
    2023-09-09
  • Java中獲取文件大小的詳解及實(shí)例代碼

    Java中獲取文件大小的詳解及實(shí)例代碼

    這篇文章主要介紹了Java中獲取文件大小的詳解及實(shí)例代碼的相關(guān)資料,一種是使用File的length()方法,另外一種是使用FileInputStream的available()方法,這里就說(shuō)下如何使用需要的朋友可以參考下
    2016-12-12
  • 關(guān)于Java中HashCode方法的深入理解

    關(guān)于Java中HashCode方法的深入理解

    這篇文章主要給大家介紹了關(guān)于Java中HashCode方法的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-08-08
  • java數(shù)組算法例題代碼詳解(冒泡排序,選擇排序,找最大值、最小值,添加、刪除元素等)

    java數(shù)組算法例題代碼詳解(冒泡排序,選擇排序,找最大值、最小值,添加、刪除元素等)

    這篇文章主要介紹了java數(shù)組算法例題代碼詳解(冒泡排序,選擇排序,找最大值、最小值,添加、刪除元素等),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-05-05
  • java稀疏數(shù)組的示例代碼

    java稀疏數(shù)組的示例代碼

    這篇文章主要介紹了java稀疏數(shù)組,稀疏數(shù)組,記錄一共有幾行幾列,有多少個(gè)不同值,把具有不同值的元素和行里了及值記錄在一個(gè)小規(guī)模的數(shù)組中,從而縮小程序的規(guī)模,對(duì)java稀疏數(shù)組相關(guān)知識(shí)感興趣的朋友一起看看吧
    2022-07-07
  • 基于Java語(yǔ)言在窗體上實(shí)現(xiàn)飛機(jī)大戰(zhàn)小游戲的完整步驟

    基于Java語(yǔ)言在窗體上實(shí)現(xiàn)飛機(jī)大戰(zhàn)小游戲的完整步驟

    這篇文章主要給大家介紹了基于Java語(yǔ)言在窗體上實(shí)現(xiàn)飛機(jī)大戰(zhàn)小游戲的完整步驟,文中通過(guò)圖文以及實(shí)例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2022-02-02

最新評(píng)論