springboot使用Redis隊(duì)列實(shí)戰(zhàn)
前言
MQ應(yīng)用有很多,比如ActiveMQ,RabbitMQ,Kafka等,但是也可以基于redis來(lái)實(shí)現(xiàn),可以降低系統(tǒng)的維護(hù)成本和實(shí)現(xiàn)復(fù)雜度,本篇介紹redis中實(shí)現(xiàn)消息隊(duì)列的幾種方案,并通過(guò)springboot實(shí)戰(zhàn)使其更易懂。
1. 基于List的 LPUSH+BRPOP 的實(shí)現(xiàn)
2. 基于Sorted-Set的實(shí)現(xiàn)
3. PUB/SUB,訂閱/發(fā)布模式
4. 基于Stream類(lèi)型的實(shí)現(xiàn)
1. 基于List的 LPUSH+BRPOP 的實(shí)現(xiàn)
描述
使用rpush和lpush操作入隊(duì)列,lpop和rpop操作出隊(duì)列。
List支持多個(gè)生產(chǎn)者和消費(fèi)者并發(fā)進(jìn)出消息,每個(gè)消費(fèi)者拿到都是不同的列表元素。
優(yōu)點(diǎn)
一旦數(shù)據(jù)到來(lái)則立刻醒過(guò)來(lái),消息延遲幾乎為零。
缺點(diǎn)
- 不能重復(fù)消費(fèi),一旦消費(fèi)就會(huì)被刪除
- 不能做廣播模式 , 不支持分組消費(fèi)
- lpop和rpop會(huì)一直空輪訓(xùn),消耗資源 ,但可以 引入阻塞讀blpop和brpop 同時(shí)也有新的問(wèn)題 如果線程一直阻塞在那里,Redis客戶(hù)端的連接就成了閑置連接,閑置過(guò)久,服務(wù)器一般會(huì)主動(dòng)斷開(kāi)連接,減少閑置資源占用,這個(gè)時(shí)候blpop和brpop或拋出異常
實(shí)戰(zhàn)
代碼
@Slf4j @Service public class ListRedisQueue { //隊(duì)列名 public static final String KEY = "listQueue"; @Resource private RedisTemplate redisTemplate; public void produce(String message) { redisTemplate.opsForList().rightPush(KEY, message); } public void consume() { while (true) { String msg = (String) redisTemplate.opsForList().leftPop(KEY); log.info("瘋狂獲取消息:" + msg); } } public void blockingConsume() { while (true) { List<Object> obj = redisTemplate.executePipelined(new RedisCallback<Object>() { @Override public Object doInRedis(RedisConnection connection) throws DataAccessException { //隊(duì)列沒(méi)有元素會(huì)阻塞操作,直到隊(duì)列獲取新的元素或超時(shí),5表示如果沒(méi)元素就每五秒去拿一次消息 return connection.bRPop(5, KEY.getBytes()); } }, new StringRedisSerializer()); for (Object str : obj) { log.info("blockingConsume獲取消息 : {}", str); } } } }
測(cè)試
lPop/rPop消費(fèi)數(shù)據(jù)
@Autowired private ListRedisQueue listRedisQueue; @Test public void produce() { for (int i = 0; i < 5; i++) { listRedisQueue.produce("第"+i + "個(gè)數(shù)據(jù)"); } } @Test public void consume() { produce(); logger.info("生產(chǎn)消息完畢"); listRedisQueue.consume(); }
輸出
blpop / brpop 消費(fèi)數(shù)據(jù)
@Test public void blockingConsume() { produce(); logger.info("生產(chǎn)消息完畢"); listRedisQueue.blockingConsume(); }
輸出
2. 基于Sorted-Set的實(shí)現(xiàn)延時(shí)隊(duì)列
描述
其實(shí)zset就是sorted set。為了避免sorted set簡(jiǎn)寫(xiě)sset導(dǎo)致命令沖突,所以改為zset。同理例如class-->clazz
sorted set從字面意思上,很容易就可以理解,是個(gè)有序且不可重復(fù)的數(shù)據(jù)集合。類(lèi)似set和hash的混合體,但是相比于set,zset內(nèi)部由score進(jìn)行排序.
優(yōu)點(diǎn)
可以自定義消息ID,在消息ID有意義時(shí),比較重要。
缺點(diǎn)
缺點(diǎn)也明顯,不允許重復(fù)消息(因?yàn)槭羌希瑫r(shí)消息ID確定有錯(cuò)誤會(huì)導(dǎo)致消息的順序出錯(cuò)。
實(shí)戰(zhàn)
代碼
@Slf4j @Service public class SortedSetRedisQueue { //隊(duì)列名 public static final String KEY = "sortedSet_queue"; @Autowired private RedisTemplate<String, Object> redisTemplate; public void produce(String msg, Double score) { // 創(chuàng)建Sorted Set實(shí)例 ZSetOperations zSetOperations = redisTemplate.opsForZSet(); // 添加數(shù)據(jù) zSetOperations.add(KEY, msg, score); } public void consumer() throws InterruptedException { // 創(chuàng)建SortedSet實(shí)例 ZSetOperations zSetOperations = redisTemplate.opsForZSet(); while (true) { // 拿取數(shù)據(jù) (rangeByScore返回有序集合中指定分?jǐn)?shù)區(qū)間的成員列表。有序集成員按分?jǐn)?shù)值遞增(從小到大)次序排列) Set<String> order = zSetOperations.rangeByScore(KEY, 0, System.currentTimeMillis(), 0, 1); if (ObjectUtils.isEmpty(order)) { log.info("當(dāng)前沒(méi)有數(shù)據(jù) 當(dāng)前線程睡眠3秒"); TimeUnit.SECONDS.sleep(3); // 跳過(guò)本次循環(huán) 重新循環(huán)拿取數(shù)據(jù) continue; } // 利用迭代器拿取Set中的數(shù)據(jù) String massage = order.iterator().next(); // 過(guò)河拆遷,拿到就刪除消息 if (zSetOperations.remove(KEY, massage) > 0) { //做些業(yè)務(wù)處理 log.info("我拿到的消息:" + massage); } } } }
測(cè)試
@Autowired private SortedSetRedisQueue sortedSetRedisQueue; @Test public void sortedSetProduce() throws InterruptedException { for (int i = 0; i < 5; i++) { TimeUnit.SECONDS.sleep(1); // 生成分?jǐn)?shù) double score = System.currentTimeMillis(); sortedSetRedisQueue.produce("第"+i + "個(gè)數(shù)據(jù)",score); } } @Test public void sortedSetConsumer() throws InterruptedException { sortedSetProduce(); logger.info("生產(chǎn)消息完畢"); sortedSetRedisQueue.consumer(); } }
輸出
3.PUB/SUB,訂閱/發(fā)布模式
描述
SUBSCRIBE,用于訂閱信道
PUBLISH,向信道發(fā)送消息
UNSUBSCRIBE,取消訂閱
此模式允許生產(chǎn)者只生產(chǎn)一次消息,由中間件負(fù)責(zé)將消息復(fù)制到多個(gè)消息隊(duì)列,每個(gè)消息隊(duì)列由對(duì)應(yīng)的消費(fèi)組消費(fèi)。
優(yōu)點(diǎn)
- 一個(gè)消息可以發(fā)布到多個(gè)消費(fèi)者
- 消費(fèi)者可以同時(shí)訂閱多個(gè)信道,因此可以接收多種消息(處理時(shí)先根據(jù)信道判斷)
- 消息即時(shí)發(fā)送,消費(fèi)者會(huì)自動(dòng)接收到信道發(fā)布的消息
缺點(diǎn)
- 消息發(fā)布時(shí),如果客戶(hù)端不在線,則消息丟失
- 消費(fèi)者處理消息時(shí)出現(xiàn)了大量消息積壓,則可能會(huì)斷開(kāi)通道,導(dǎo)致消息丟失
- 消費(fèi)者接收消息的時(shí)間不一定是一致的,可能會(huì)有差異(業(yè)務(wù)處理需要判重)
實(shí)戰(zhàn)
監(jiān)聽(tīng)器
@Slf4j @Component public class RedisMessageListenerListener implements MessageListener { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 消息處理 * * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { String channel = new String(pattern); log.info("onMessage --> 消息通道是:{}", channel); RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer(); Object deserialize = valueSerializer.deserialize(message.getBody()); log.info("反序列化的結(jié)果:{}", deserialize); if (deserialize == null) return; String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8)); log.info("計(jì)算得到的key: {}", md5DigestAsHex); Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS); if (Boolean.TRUE.equals(result)) { // redis消息進(jìn)行處理 log.info("接收的結(jié)果:{}", deserialize.toString()); } else { log.info("其他服務(wù)處理中"); } } }
實(shí)現(xiàn)MessageListener 接口,就可以通過(guò)onMessage()方法接收到消息了,該方法有兩個(gè)參數(shù):
- 參數(shù) message 的 getBody() 方法以二進(jìn)制形式獲取消息體, getChannel() 以二進(jìn)制形式獲取消息通道
- 參數(shù) pattern 二進(jìn)制形式的消息通道(實(shí)際和 message.getChannel() 返回值相同)
綁定監(jiān)聽(tīng)器
@Configuration public class RedisMessageListenerConfig { @Bean public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, new ChannelTopic(PubSubRedisQueue.KEY)); return redisMessageListenerContainer; } }
RedisMessageListenerContainer 是為Redis消息偵聽(tīng)器 MessageListener 提供異步行為的容器。處理偵聽(tīng)、轉(zhuǎn)換和消息分派的低級(jí)別詳細(xì)信息。
本文使用的是主題訂閱:ChannelTopic,你也可以使用模式匹配:PatternTopic,從而匹配多個(gè)信道。
生產(chǎn)者
@Service public class PubSubRedisQueue { //隊(duì)列名 public static final String KEY = "pub_sub_queue"; @Autowired private RedisTemplate<String, Object> redisTemplate; public void produce(String message) { redisTemplate.convertAndSend(KEY, message); } }
測(cè)試
@Slf4j @RestController @RequestMapping(value = "/queue") public class RedisMQController { @Autowired private PubSubRedisQueue pubSubRedisQueue; @RequestMapping(value = "/pubsub/produce", method = RequestMethod.GET) public void pubsubProduce(@RequestParam(name = "msg") String msg) { pubSubRedisQueue.produce(msg); }
隨便找個(gè)瀏覽器請(qǐng)求生產(chǎn)者接口:
所以每插入一條消息,監(jiān)聽(tīng)者則立即進(jìn)去消費(fèi)
4. 基于Stream類(lèi)型的實(shí)現(xiàn)(Redis Version5.0)
描述
Stream為redis 5.0后新增的數(shù)據(jù)結(jié)構(gòu)。支持多播的可持久化消息隊(duì)列,實(shí)現(xiàn)借鑒了Kafka設(shè)計(jì)。
Redis Stream的結(jié)構(gòu)如上圖所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的ID和對(duì)應(yīng)的內(nèi)容。消息是持久化的,Redis重啟后,內(nèi)容還在。
每個(gè)Stream都有唯一的名稱(chēng),它就是Redis的key,在我們首次使用xadd指令追加消息時(shí)自動(dòng)創(chuàng)建。
每個(gè)Stream都可以?huà)於鄠€(gè)消費(fèi)組,每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo)last_delivered_id在Stream數(shù)組之上往前移動(dòng),表示當(dāng)前消費(fèi)組已經(jīng)消費(fèi)到哪條消息了。每個(gè)消費(fèi)組都有一個(gè)Stream內(nèi)唯一的名稱(chēng),消費(fèi)組不會(huì)自動(dòng)創(chuàng)建,它需要單獨(dú)的指令xgroup create進(jìn)行創(chuàng)建,需要指定從Stream的某個(gè)消息ID開(kāi)始消費(fèi),這個(gè)ID用來(lái)初始化last_delivered_id變量。
每個(gè)消費(fèi)組(Consumer Group)的狀態(tài)都是獨(dú)立的,相互不受影響。也就是說(shuō)同一份Stream內(nèi)部的消息會(huì)被每個(gè)消費(fèi)組都消費(fèi)到。
同一個(gè)消費(fèi)組(Consumer Group)可以?huà)旖佣鄠€(gè)消費(fèi)者(Consumer),這些消費(fèi)者之間是競(jìng)爭(zhēng)關(guān)系,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo)last_delivered_id往前移動(dòng)。每個(gè)消費(fèi)者者有一個(gè)組內(nèi)唯一名稱(chēng)。
消費(fèi)者(Consumer)內(nèi)部會(huì)有個(gè)狀態(tài)變量pending_ids,它記錄了當(dāng)前已經(jīng)被客戶(hù)端讀取的消息,但是還沒(méi)有ack。如果客戶(hù)端沒(méi)有ack,這個(gè)變量里面的消息ID會(huì)越來(lái)越多,一旦某個(gè)消息被ack,它就開(kāi)始減少。這個(gè)pending_ids變量在Redis官方被稱(chēng)之為PEL,也就是Pending Entries List,這是一個(gè)很核心的數(shù)據(jù)結(jié)構(gòu),它用來(lái)確??蛻?hù)端至少消費(fèi)了消息一次,而不會(huì)在網(wǎng)絡(luò)傳輸?shù)闹型緛G失了沒(méi)處理。
優(yōu)點(diǎn)
- 高性能:可以在非常短的時(shí)間內(nèi)處理大量的消息。
- 持久化:支持?jǐn)?shù)據(jù)持久化,即使Redis服務(wù)器宕機(jī),也可以恢復(fù)之前的消息。
- 順序性:保證消息的順序性,即使是并發(fā)的消息也會(huì)按照發(fā)送順序排列。
- 靈活性:可以方便地?cái)U(kuò)展和分布式部署,可以滿(mǎn)足不同場(chǎng)景下的需求。
缺點(diǎn)
- 功能相對(duì)簡(jiǎn)單:Redis Stream相對(duì)于其他的消息隊(duì)列,功能相對(duì)簡(jiǎn)單,無(wú)法滿(mǎn)足一些復(fù)雜的需求。
- 不支持消息回溯:即消費(fèi)者無(wú)法獲取之前已經(jīng)消費(fèi)過(guò)的消息。
- 不支持多消費(fèi)者分組:無(wú)法實(shí)現(xiàn)多個(gè)消費(fèi)者并發(fā)消費(fèi)消息的功能。
實(shí)戰(zhàn)
自動(dòng)ack消費(fèi)者
@Slf4j @Component public class AutoAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> { //分組名 public static final String GROUP = "autoack_stream"; @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { String stream = message.getStream(); RecordId id = message.getId(); Map<String, String> map = message.getValue(); log.info("[自動(dòng)ACK]接收到一個(gè)消息 stream:[{}],id:[{}],value:[{}]", stream, id, map); redisTemplate.opsForStream().delete(GROUP, id.getValue()); } }
手動(dòng)ack消費(fèi)者
@Slf4j @Component public class BasicAckStreamConsumeListener implements StreamListener<String, MapRecord<String, String, String>> { //分組名 public static final String GROUP = "basicack_stream"; @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { String stream = message.getStream(); RecordId id = message.getId(); Map<String, String> map = message.getValue(); log.info("[手動(dòng)ACK]接收到一個(gè)消息 stream:[{}],id:[{}],value:[{}]", stream, id, map); redisTemplate.opsForStream().acknowledge(stream, GROUP, id.getValue()); //消費(fèi)完畢刪除該條消息 redisTemplate.opsForStream().delete(GROUP, id.getValue()); } }
綁定關(guān)系
@Slf4j @Configuration public class RedisStreamConfiguration { @Autowired private RedisConnectionFactory redisConnectionFactory; @Autowired private AutoAckStreamConsumeListener autoAckStreamConsumeListener; @Autowired private BasicAckStreamConsumeListener basicAckStreamConsumeListener; @Autowired private RedisTemplate<String, Object> redisTemplate; @Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer() { AtomicInteger index = new AtomicInteger(1); int processors = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), r -> { Thread thread = new Thread(r); thread.setName("async-stream-consumer-" + index.getAndIncrement()); thread.setDaemon(true); return thread; }); StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() // 一次最多獲取多少條消息 .batchSize(3) // 運(yùn)行 Stream 的 poll task .executor(executor) // Stream 中沒(méi)有消息時(shí),阻塞多長(zhǎng)時(shí)間,需要比 `spring.redis.timeout` 的時(shí)間小 .pollTimeout(Duration.ofSeconds(3)) // 獲取消息的過(guò)程或獲取到消息給具體的消息者處理的過(guò)程中,發(fā)生了異常的處理 .errorHandler(new ErrorHandler() { @Override public void handleError(Throwable t) { log.info("出現(xiàn)異常就來(lái)這里了" + t); } }) .build(); StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options); // 獨(dú)立消費(fèi) // 消費(fèi)組A,自動(dòng)ack // 從消費(fèi)組中沒(méi)有分配給消費(fèi)者的消息開(kāi)始消費(fèi) if (!isStreamGroupExists(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP)){ redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,AutoAckStreamConsumeListener.GROUP); } streamMessageListenerContainer.receiveAutoAck(Consumer.from(AutoAckStreamConsumeListener.GROUP, "AutoAckConsumer"), StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), autoAckStreamConsumeListener); // 消費(fèi)組B,不自動(dòng)ack if (!isStreamGroupExists(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP)){ redisTemplate.opsForStream().createGroup(StreamRedisQueue.KEY,BasicAckStreamConsumeListener.GROUP); } streamMessageListenerContainer.receive(Consumer.from(BasicAckStreamConsumeListener.GROUP, "BasicAckConsumer"), StreamOffset.create(StreamRedisQueue.KEY, ReadOffset.lastConsumed()), basicAckStreamConsumeListener); return streamMessageListenerContainer; } /** * 判斷該消費(fèi)組是否存在 * @param streamKey * @param groupName * @return */ public boolean isStreamGroupExists(String streamKey, String groupName) { RedisStreamCommands commands = redisConnectionFactory.getConnection().streamCommands(); //首先檢查Stream Key是否存在,否則下面代碼可能會(huì)因?yàn)閲L試檢查不存在的Stream Key而導(dǎo)致異常 if (!redisTemplate.hasKey(streamKey)){ return false; } //獲取streamKey下的所有g(shù)roups StreamInfo.XInfoGroups xInfoGroups = commands.xInfoGroups(streamKey.getBytes()); AtomicBoolean exists= new AtomicBoolean(false); xInfoGroups.forEach(xInfoGroup -> { if (xInfoGroup.groupName().equals(groupName)){ exists.set(true); } }); return exists.get(); } }
生產(chǎn)工具
@Slf4j @Service public class StreamRedisQueue { //隊(duì)列名 public static final String KEY = "stream_queue"; @Autowired private RedisTemplate<String, Object> redisTemplate; public String produce(Map<String, String> value) { return redisTemplate.opsForStream().add(KEY, value).getValue(); } public void createGroup(String key, String group){ redisTemplate.opsForStream().createGroup(key, group); } }
測(cè)試
生產(chǎn)消息
@Slf4j @RestController @RequestMapping(value = "/queue") public class RedisMQController { @Autowired private StreamRedisQueue streamRedisQueue; @RequestMapping(value = "/stream/produce", method = RequestMethod.GET) public void streamProduce() { Map<String, String> map = new HashMap<>(); map.put("劉德華", "大家好我是劉德華"); map.put("周杰倫", "周杰倫"); map.put("time", DateUtil.now()); String result = streamRedisQueue.produce(map); log.info("返回結(jié)果:{}", result); } }
只要有消息,消費(fèi)者就會(huì)消費(fèi)
到此這篇關(guān)于springboot使用Redis隊(duì)列實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)springboot Redis隊(duì)列實(shí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot利用redis集成消息隊(duì)列的方法
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot集成Redisson實(shí)現(xiàn)延遲隊(duì)列的場(chǎng)景分析
- springboot整合redis之消息隊(duì)列
- SpringBoot+Redis隊(duì)列實(shí)現(xiàn)Java版秒殺的示例代碼
- SpringBoot實(shí)現(xiàn)redis延遲隊(duì)列的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- SpringBoot中Redisson延遲隊(duì)列的示例
相關(guān)文章
Java實(shí)現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改
這篇文章主要介紹了Java實(shí)現(xiàn)Dbhelper支持大數(shù)據(jù)增刪改功能的實(shí)現(xiàn)過(guò)程,感興趣的小伙伴們可以參考一下2016-01-01mybatis中使用oracle關(guān)鍵字出錯(cuò)的解決方法
這篇文章主要給大家介紹了關(guān)于mybatis中使用oracle關(guān)鍵字出錯(cuò)的解決方法,文中通過(guò)示例代碼將解決的方法介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧。2017-08-08基于Java實(shí)現(xiàn)回調(diào)監(jiān)聽(tīng)工具類(lèi)
這篇文章主要為大家詳細(xì)介紹了如何基于Java實(shí)現(xiàn)一個(gè)回調(diào)監(jiān)聽(tīng)工具類(lèi),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-04-04解決java.util.NoSuchElementException異常正確方法
java.util.NoSuchElementException是Java中的一種異常,表示在迭代器或枚舉中找不到元素,這篇文章主要給大家介紹了關(guān)于解決java.util.NoSuchElementException異常的相關(guān)資料,需要的朋友可以參考下2023-11-11SpringBoot整合Mybatis?LocalDateTime?映射失效的解決
這篇文章主要介紹了SpringBoot整合Mybatis?LocalDateTime?映射失效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01為何HashSet中使用PRESENT而不是null作為value
這篇文章主要介紹了為何HashSet中使用PRESENT而不是null作為value,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10