Java中對于并發(fā)問題的處理思路分享
首先我們一起回顧一些并發(fā)的場景
首先最基本的,我們要弄清楚什么的并發(fā)嘞?我簡單粗暴的理解就是:一段代碼,在同一時間段內(nèi),被多個線程同時處理的情況就是并發(fā)現(xiàn)象。下面簡單畫了個圖:
那么只要是并發(fā)現(xiàn)象就需要我們進行并發(fā)處理嗎?那肯定不是滴。我們就拿大家都能理解的訂單業(yè)務來舉例,比如說下面兩種簡單的場景:
- 對于C端業(yè)務來講,基本上是由一串隨機的序列號組成,可以為UUID、數(shù)字串、年月日商戶(加密)+隨機唯一序列號等等方式。這樣的目的也是為了保障商戶訂單量的安全,防止他人去進行惡意分析。
- 對于B端業(yè)務來講,基本上都是由商戶+年月日+順序遞增序列號的方式組成。這樣方便客戶方進行訂單的匯總以及后期的追溯業(yè)務。
以上兩種場景的區(qū)別基本上就是隨機唯一序列號和順序遞增序列號的區(qū)別。偽代碼如下:
public void addOrder() { // 1.獲取當前年月日以及商戶標識 String currentDate = "yyyyMMddHHmmss"; String businessman = "商戶標識"; // 2.獲取獲取序列號 long index = getIndex(); // 3.拼接訂單號 String orderNum = businessman + currentDate + index; // 4.生成訂單 save(訂單對象); }
那么對于C端的隨機唯一序列號來講,我認為肯定是沒必要進行并發(fā)控制的,只要寫一個生成隨機唯一序列號的算法就好了,這樣生成出來的訂單號必然是唯一的。
public String getIndex() { // 根據(jù)算法生成唯一序列號 return buildIndexUtils.build(); }
但對于B端的順序遞增序列號來講,就需要進行并發(fā)控制了。因為既然要保證順序遞增,我在生成當前序列號的同時就必然需要之前上一個單子的序列號是什么,因此我就必然需要一個地方去存儲這個序列號。偽代碼如下:
public String getIndex() { // 1.獲取當前商戶、當前單據(jù)已生成的最大序列號 Integer index = dao.getIndex(商戶, 單據(jù)) + 1; // 2.序列號 + 1 index = index++; // 3.修改當前商戶、當前單據(jù)已生成的最大序列號 dao.update(商戶, 單據(jù), index); // 4.返回序列號 return index + ""; }
此時如果事務為可重復讀,Thread1開啟事務并獲取并修改序列號,此時在Thread1未提交事務之前Thread2開啟事務并獲取序列號。此時兩個線程獲取到的序列號必然是一致的,這樣就會出現(xiàn)訂單號重復的問題。
如果更換隔離級別呢?是否能夠解決這個問題?
- 讀已提交?同樣如果在Thread1提交事務之前Thread2就執(zhí)行完第一步獲取最大序列號呢?一樣有問題。
- 讀未提交?一樣的呀,在兩個Thread都執(zhí)行完第一步,但沒有執(zhí)行update的情況。
- 串行化?那就和加同步鎖沒啥區(qū)別的,而且是阻塞式的。一堆請求占用數(shù)據(jù)庫連接阻塞在這里,如果出現(xiàn)資源耗盡的情況就比較嚴重了。
- 不用事務?這個如果遇到2中的場景也一樣的。
那么加鎖呢?
- 單機環(huán)境下我們可以選擇Synchronized或Lock來進行處理。眾所周知,JDK1.6之后就對Synchronized進行了改進,不再是單純的阻塞,而是先進行自旋處理,在一定程度上也達到了自旋節(jié)省資源的效果。但是Synchronized或Lock還是要根據(jù)實際情況來進行處理的。如果我們?yōu)榱耸∈露褂肧ynchronized對事務代碼進行加鎖的話,首先我們要保證避免長事務的出現(xiàn),否則響應超時了,而事務還沒有釋放,那就比較嚴重了,異常情況堪比鎖表。
- 分布式環(huán)境下我們可以依賴Redis或Zookeeper來實現(xiàn)分布式鎖。這里需要注意的是,如果要依賴Redis實現(xiàn)的話,盡可能保證Redis采用單實例或分片集群的方式進行部署。主從的部署方式在某種極端情況下出現(xiàn)節(jié)點宕機時會導致誤判的情況。畢竟Redis是AP性質(zhì)的。
- 還可以通過數(shù)據(jù)庫來實現(xiàn),比如通過select for update來實現(xiàn)行鎖、通過version字段實現(xiàn)樂觀鎖、添加唯一約束的方式。首先select for update實現(xiàn)行鎖和上面的串行化事務差別不大,都是數(shù)據(jù)庫連接的阻塞,不建議使用。而樂觀鎖和唯一約束的方案更適用于作為一個保底方案,否則人家并發(fā)請求的時候只有一個請求能成功,其他的都失敗。這樣的用戶體驗也不好。
最后我們能得出一個結論。是否進行并發(fā)控制要依據(jù)該并發(fā)操作是否會造成數(shù)據(jù)安全問題來決定的。好了,下面向大家分享一些在學習工作中對于并發(fā)問題的處理思路
由于請求重試導致的并發(fā)安全問題
在與第三方系統(tǒng)交互或者微服務內(nèi)部跨模塊交互時,我們通常會采用HTTP或RPC等方式,并設置最大請求時間以及重試次數(shù)。因為我們絕對不允許因為下游服務的異常問題而拖累當前服務的正常運行。而通常情況下,最大請求時間也是根據(jù)兩個服務之間的實際業(yè)務以及下游接口進行多次測試而設定的,一般來說不會隨便的出現(xiàn)請求超時的情況。但是一旦下游業(yè)務的接口因為某種原因(比如網(wǎng)絡卡頓或者出現(xiàn)效率問題)導致請求超時的情況,就很有可能因為上游服務的重試而導致下游服務數(shù)據(jù)重復的問題。
這種情況從本質(zhì)上來說也就是個重復消費的問題。我們只需要雙方配合做好冪等就好了。
1.首先,如果涉及到前端,比如說點擊前端的按鈕觸發(fā)業(yè)務并且調(diào)用下游服務的業(yè)務。這個時候既要考慮前端重復提交也要考慮后端的重復發(fā)送以及重復消費問題。前端最常用的方式就是做一個進度條或進行防抖處理,避免一個用戶頻繁點擊按鈕。
那么如果是多個用戶同時提交同一條數(shù)據(jù)呢?這個情況主要是在B端業(yè)務中出現(xiàn),比如說多個用戶均具有這條數(shù)據(jù)的修改權限,此時也并發(fā)點擊按鈕提交了這條數(shù)據(jù)。一般來說,這種情況出現(xiàn)的概率還是極少數(shù)的,也不會有多少并發(fā)量。因此我們直接采用數(shù)據(jù)庫的樂觀鎖進行保底控制就好了,只允許一個人操作成功,其他人操作失敗并提示該數(shù)據(jù)已被修改。
/** * @param id 數(shù)據(jù)ID * @param status 數(shù)據(jù)的狀態(tài) */ public void update(Long id, Integer status) { // 1.根據(jù)ID查詢數(shù)據(jù) PO po = dao.select(id); // 2.判斷數(shù)據(jù)的狀態(tài)是否符合修改要求(這一步主要是應對兩個線程都進入Controller層,其中線程1剛好提交事務后,線程2開始事務的情況) if(!status.equals(po.getStatus())) { throw new TJCException("數(shù)據(jù)已被修改,請刷新后重試"); } // 3.修改數(shù)據(jù)(啟用樂觀鎖機制,主要應對線程1提交事務之前線程2開啟事務的情況) int i = dao.update("update table set xxx = ?, version = version + 1 where id = ? and version > ?"); if(i == 0) { throw new TJCException("數(shù)據(jù)已被修改,請刷新后重試"); } // 繼續(xù)執(zhí)行下面業(yè)務 }
2.上游服務請求下游服務時,在請求頭或消息中添加消息唯一ID。下游服務第一次接收到這個消息后首先將消息保存在緩存中并根據(jù)測試結果設置合理的有效期(有效期盡可能比正常請求時間長個一兩分鐘就好)。這樣就可以攔截上述所說的重試導致的重復消費問題。
// 上游服務發(fā)送消息 public void request() { String messageId = "xxxx"; rpc.request(messageId, message); } // 下游服務消費消息 public void consume(String messageId, String message) { // 將messageId存儲在redis中, 單機環(huán)境也可以直接找個map去存或者存在Guava中 Boolean flag = stringRedisTemplate.opsForValue() .setIfAbsent(messageId, "1", 60, TimeUnit.SECONDS); if(!flag) { log.error("重復消息攔截"); return; } // 繼續(xù)執(zhí)行下面業(yè)務 ..... // 事務完成后(提交/回滾),刪除標識 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCompletion(int status) { stringRedisTemplate.delete(messageId); } }); }
在這里是否有小伙伴會有這樣的一個疑問,如果重復發(fā)送的消息中messageId不一致或者上游服務接口本身就被調(diào)用了多次怎么辦?
(1)首先,我覺得在上游服務接口本身就被調(diào)用了多次的情況下,第一點中的第2步驟(判斷數(shù)據(jù)狀態(tài))這種方式就可以把它攔截掉。
(2)其次,如果出現(xiàn)重復發(fā)送的消息中messageId不一致的情況,我認為這就屬于程序員問題了,可以不放在這里進行考慮。如果硬要考慮的話,貌似也沒什么更好的辦法,那就加鎖吧。
順序遞增訂單號問題
在開頭我們通過引用這個生成訂單號的例子分析了一些什么情況下需要進行并發(fā)處理問題,并且上面是采用加鎖方式處理的。那么是否還有其他的方式比加鎖更好一些呢?比較加鎖影響吞吐量呀,哈哈。非必要情況下,我是不會進行加鎖處理的,除非在定制開發(fā)的過程中,用戶的要求是能用就行,那就可以偷懶了哈哈,節(jié)省時間去摸魚?。。。?/p>
下面給大家分享一些我常用的一種方式:Redis+Lua。我們都知道操作內(nèi)存肯定是比操作數(shù)據(jù)庫要更快一些的,那么我們可以干脆將各個單據(jù)的序列號添加到Redis中。并且訂單號是根據(jù)年月日來進行重置的,所以我們可以將序列號的過期時間設置為24小時。
偽代碼如下:
// 序列號的key可以設置為(模塊名:orderIndex:訂單類型:yyyyMMdd) String dateFormat = getCurrentDateFormat("yyyyMMdd"); // key String key = 模塊名 + ":" + orderIndex + ":" + 訂單類型 + ":" + dateFormat; String script = "if (redis.call('exists', KEYS[1]) == 0) then redis.call('setex', KEYS[1], ARGV[1], ARGV[2]) return 1 else return redis.call('incr', KEYS[1]) end"; DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(Long.class); defaultRedisScript.setScriptText(script); long count = stringRedisTemplate.execute(defaultRedisScript, Arrays.asList(key), (3600 * 24) + "", "1");
我們都清楚,Redis多指令執(zhí)行是沒辦法保證原子性的。所以我們要借助Lua腳本將多個Redis執(zhí)行以腳本的方式執(zhí)行來保證多指令執(zhí)行的原子性,再配合Redis基于內(nèi)存以及單線程執(zhí)行指令的優(yōu)勢,可以代替鎖來賦予功能更大的吞吐量。
計數(shù)統(tǒng)計問題
在工作中我還做過這樣一個需求。首先通過消息隊列接收、主動拉取數(shù)據(jù)源的方式獲取用戶在實際業(yè)務中產(chǎn)生的源數(shù)據(jù)并根據(jù)設置的規(guī)則比對校驗生成符合條件的數(shù)據(jù)保存在數(shù)據(jù)庫中。并且對通過各個維度對生成的數(shù)據(jù)進行計數(shù)統(tǒng)計并推送下游單據(jù)。
比如說其中有一個統(tǒng)計維度為“在各個班的工作時間內(nèi),根據(jù)次數(shù)統(tǒng)計符合條件的數(shù)據(jù)并匯總推送下游單據(jù)”。那么要做這項業(yè)務,首先我們要對各個班的數(shù)據(jù)進行分別計數(shù),當前班開始工作時同步開啟計數(shù),結束工作時停止計數(shù),當計數(shù)器達到設置的標準后,將這些數(shù)據(jù)進行統(tǒng)計處理后推送下游單據(jù)。
根據(jù)上面的業(yè)務,通常來說有兩種方式解決:
- 將班、計數(shù)量、數(shù)據(jù)ID等數(shù)據(jù)存儲在數(shù)據(jù)庫中,并對獲取數(shù)據(jù)、處理數(shù)據(jù)、計數(shù)、推送下游單據(jù)等操作統(tǒng)一加鎖進行處理,保證數(shù)據(jù)計數(shù)的準確性。
- 依然是通過Redis+Lua的方式進行處理。
最后通過實際的業(yè)務分析決定采用Redis+Lua的方式進行處理。只不過這次的Lua要寫相對復雜的業(yè)務了。
偽代碼如下:
/** * @param indexStdId 標準ID * @param currentTeamClassId 班ID * @param dataId 數(shù)據(jù)ID * @param count 計數(shù)要求 */ public List<Long> countMonitor(Long indexStdId, Long currentTeamClassId, Long dataId, Integer count) { StringBuilder countMonitorLua = new StringBuilder(); countMonitorLua.append("if (redis.call('hget', KEYS[1], KEYS[2]) == ARGV[2]) "); countMonitorLua.append("then "); countMonitorLua.append(" if (redis.call('hget', KEYS[1], KEYS[3]) == ARGV[3]) "); countMonitorLua.append(" then "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[3], 0) "); countMonitorLua.append(" redis.call('lpush', KEYS[4], ARGV[1]) "); countMonitorLua.append(" local list = redis.call('lrange', KEYS[4], 0, -1) "); countMonitorLua.append(" redis.call('del', KEYS[4]) "); countMonitorLua.append(" return list "); countMonitorLua.append(" else "); countMonitorLua.append(" redis.call('lpush', KEYS[4], ARGV[1]) "); countMonitorLua.append(" redis.call('hincrby', KEYS[1], KEYS[3], 1) "); countMonitorLua.append(" return {} "); countMonitorLua.append(" end "); countMonitorLua.append("else "); countMonitorLua.append(" redis.call('del', KEYS[4]) "); countMonitorLua.append(" redis.call('lpush', KEYS[4], ARGV[1]) "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[3], 1) "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[2], ARGV[2]) "); countMonitorLua.append(" if (redis.call('hget', KEYS[1], KEYS[3]) == ARGV[4]) "); countMonitorLua.append(" then "); countMonitorLua.append(" redis.call('hset', KEYS[1], KEYS[3], 0) "); countMonitorLua.append(" local list2 = redis.call('lrange', KEYS[4], 0, -1) "); countMonitorLua.append(" redis.call('del', KEYS[4]) "); countMonitorLua.append(" return list2 "); countMonitorLua.append(" else "); countMonitorLua.append(" return {} "); countMonitorLua.append(" end "); countMonitorLua.append("end "); DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>(); defaultRedisScript.setResultType(List.class); defaultRedisScript.setScriptText(countMonitorLua.toString()); List<String> keys = new ArrayList<>(); keys.add(COUNTMONITOR_HASH.replace("${indexStd}", indexStdId.toString())); keys.add(COUNTMONITOR_HASH_CURRENTTEAMCLASSID); keys.add(COUNTMONITOR_HASH_COUNT); keys.add(COUNTMONITOR_LIST.replace("${indexStd}", indexStdId.toString())); List dataIdList = stringRedisTemplate.execute(defaultRedisScript, keys, gapDataId.toString(), currentTeamClassId.toString(), (count - 1) + "", count + ""); List<Long> collect = null; if(!gapDataIdList.isEmpty()) { collect = (List<Long>) gapDataIdList.stream().map(o -> Long.valueOf(o.toString())).collect(Collectors.toList()); } return collect; }
以上代碼是根據(jù)我實際的業(yè)務代碼改編成的偽代碼,這個段代碼沒必要看懂哈,首先是偽代碼,其實這個業(yè)務比較復雜,我也沒寫注釋。更多的還是分享一下優(yōu)化的處理思路:
首先計數(shù)量是由客戶定的,可以設置的很小也可以設置的很大。由于這一點考慮,我將計數(shù)分成的兩部分,一個是String類型的key做計數(shù)器,一個是List類型的key用來記錄正在被計數(shù)的數(shù)據(jù)ID。這個List有可能是一個大key。所以我們不會去頻繁的讀取它的數(shù)量進行判斷,而是通過讀取這個String類型的計數(shù)器來校驗計數(shù)。當計數(shù)符合條件后就將List取出來。這樣做的好處是節(jié)省了頻繁讀取大key的耗時(實際上Redis讀取大Key是非常耗時的,我們在實際開發(fā)中要時刻注意這一點)。
總結
總體來說,優(yōu)化并發(fā)問題本質(zhì)上就是通過優(yōu)化各種請求的耗時(例如事務的耗時、數(shù)據(jù)庫連接的耗時、http/rpc的耗時)來提升功能的吞吐量,達到用最少的資源浪費處理更多的事情。
我處理并發(fā)問題的思路總體上也就是通過同步鎖、數(shù)據(jù)庫鎖以及唯一約束、Redis單線程的天然優(yōu)勢這三點上進行綜合考慮,選擇中更適合業(yè)務場景的一種處理方式。實際上退一萬步說,對于一些B端的業(yè)務,用戶的需求只是能用就行,那我們做定制開發(fā)的小伙伴們就直接一個鎖就解決問題了,這樣何樂而不為呢?還能節(jié)省出更多的摸魚時間!哈哈?。?!
但對于做通用產(chǎn)品來說,還是要盡可能的考慮更大的吞吐量。有的小伙伴可能有有疑問,Redis通常的使用規(guī)范不是只允許存放那些查詢頻率非常高的熱點數(shù)據(jù)嗎?嗯,那是對于大多數(shù)C端互聯(lián)網(wǎng)項目而言的。而B端項目普遍業(yè)務要更加的復雜,而在這個基礎上我們要想追求更大的吞吐量,其實用一用Redis也未嘗不可哈。畢竟B端的QPS相比于C端來說要根本不在一個數(shù)量級。就算是偶然出現(xiàn)幾個大Key,能有什么關系呢,只要我們設計的嚴謹一點,能夠把控整體的資源就好啦。
以上就是Java中對于并發(fā)問題的處理思路分享的詳細內(nèi)容,更多關于Java處理并發(fā)問題的資料請關注腳本之家其它相關文章!
相關文章
Java8 自定義CompletableFuture的原理解析
這篇文章主要介紹了Java8 自定義CompletableFuture的原理解析,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11解析springBoot-actuator項目構造中health端點工作原理
這篇文章主要介紹了springBoot-actuator中health端點工作原理,對spring-boot-actuator的項目構造,工作原理進行了全面的梳理,側重health健康檢查部分2022-02-02解決spring cloud gateway 獲取body內(nèi)容并修改的問題
這篇文章主要介紹了解決spring cloud gateway 獲取body內(nèi)容并修改的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12