spring異步service中處理線程數(shù)限制詳解
情況簡介
spring項目,controller異步調(diào)用service的方法,產(chǎn)生大量并發(fā)。
具體業(yè)務(wù):
前臺同時傳入大量待翻譯的單詞,后臺業(yè)務(wù)接收單詞,并調(diào)用百度翻譯接口翻譯接收單詞并將翻譯結(jié)果保存到數(shù)據(jù)庫,前臺不需要實時返回翻譯結(jié)果。
處理方式:
controller接收文本調(diào)用service中的異步方法,將單詞先保存到隊列中,再啟動2個新線程,從緩存隊列中取單詞,并調(diào)用百度翻譯接口獲取翻譯結(jié)果并將翻譯結(jié)果保存到數(shù)據(jù)庫。
本文主要知識點:
多線程同時(異步)調(diào)用方法后,開啟新線程,并限制線程數(shù)量。
代碼如下:
@Service
public class LgtsAsyncServiceImpl {
/** logger日志. */
public static final Logger LOGGER = Logger.getLogger(LgtsAsyncServiceImpl2.class);
private final BlockingQueue<Lgts> que = new LinkedBlockingQueue<>();// 待翻譯的隊列
private final AtomicInteger threadCnt = new AtomicInteger(0);// 當(dāng)前翻譯中的線程數(shù)
private final Vector<String> existsKey = new Vector<>();// 保存已入隊列的數(shù)據(jù)
private final int maxThreadCnt = 2;// 允許同時執(zhí)行的翻譯線程數(shù)
private static final int NUM_OF_EVERY_TIME = 50;// 每次提交的翻譯條數(shù)
private static final String translationFrom = "zh";
@Async
public void saveAsync(Lgts t) {
if (Objects.isNull(t) || StringUtils.isAnyBlank(t.getGco(), t.getCode())) {
return;
}
offer(t);
save();
return;
}
private boolean offer(Lgts t) {
String key = t.getGco() + "-" + t.getCode();
if (!existsKey.contains(key)) {
existsKey.add(key);
boolean result = que.offer(t);
// LOGGER.trace("待翻譯文字[" + t.getGco() + ":" + t.getCode() + "]加入隊列結(jié)果[" + result
// + "],隊列中數(shù)據(jù)總個數(shù):" + que.size());
return result;
}
return false;
}
@Autowired
private LgtsService lgtsService;
private void save() {
int cnt = threadCnt.incrementAndGet();// 當(dāng)前線程數(shù)+1
if (cnt > maxThreadCnt) {
// 已啟動的線程大于設(shè)置的最大線程數(shù)直接丟棄
threadCnt.decrementAndGet();// +1的線程數(shù)再-回去
return;
}
GwallUser user = UserUtils.getUser();
Thread thr = new Thread() {
public void run() {
long sleepTime = 30000l;
UserUtils.setUser(user);
boolean continueFlag = true;
int maxContinueCnt = 5;// 最大連續(xù)休眠次數(shù),連續(xù)休眠次數(shù)超過最大休眠次數(shù)后,while循環(huán)退出,當(dāng)前線程銷毀
int continueCnt = 0;// 連續(xù)休眠次數(shù)
while (continueFlag) {// 隊列不為空時執(zhí)行
if (Objects.isNull(que.peek())) {
try {
if (continueCnt > maxContinueCnt) {
// 連續(xù)休眠次數(shù)達到最大連續(xù)休眠次數(shù),當(dāng)前線程將銷毀。
continueFlag = false;
continue;
}
// 隊列為空,準(zhǔn)備休眠
Thread.sleep(sleepTime);
continueCnt++;
continue;
} catch (InterruptedException e) {
// 休眠失敗,無需處理
e.printStackTrace();
}
}
continueCnt = 0;// 重置連續(xù)休眠次數(shù)為0
List<Lgts> params = new ArrayList<>();
int totalCnt = que.size();
que.drainTo(params, NUM_OF_EVERY_TIME);
StringBuilder utf8q = new StringBuilder();
String code = "";
List<Lgts> needRemove = new ArrayList<>();
for (Lgts lgts : params) {
if (StringUtils.isAnyBlank(code)) {
code = lgts.getCode();
}
// 移除existsKey中保存的key,以免下面翻譯失敗時再次加入隊列時,加入不進去
String key = lgts.getGco() + "-" + lgts.getCode();
existsKey.remove(key);
if (!code.equalsIgnoreCase(lgts.getCode())) {// 要翻譯的目標(biāo)語言與當(dāng)前列表中的第一個不一致
offer(lgts);// 重新將待翻譯的語言放回隊列
needRemove.add(lgts);
continue;
}
utf8q.append(lgts.getGco()).append("\n");
}
params.removeAll(needRemove);
LOGGER.debug("隊列中共" + totalCnt + " 個,獲取" + params.size() + " 個符合條件的待翻譯內(nèi)容,編碼:" + code);
String to = "en";
if (StringUtils.isAnyBlank(utf8q, to)) {
LOGGER.warn("調(diào)用翻譯出錯,未找到[" + code + "]對應(yīng)的百度編碼。");
continue;
}
Map<String, String> result = getBaiduTranslation(utf8q.toString(), translationFrom, to);
if (Objects.isNull(result) || result.isEmpty()) {// 把沒有獲取到翻譯結(jié)果的重新放回隊列
for (Lgts lgts : params) {
offer(lgts);
}
LOGGER.debug("本次翻譯結(jié)果為空。");
continue;
}
int sucessCnt = 0, ignoreCnt = 0;
for (Lgts lgts : params) {
lgts.setBdcode(to);
String gna = result.get(lgts.getGco());
if (StringUtils.isAnyBlank(gna)) {
offer(lgts);// 重新將待翻譯的語言放回隊列
continue;
}
lgts.setStat(1);
lgts.setGna(gna);
int saveResult = lgtsService.saveIgnore(lgts);
if (0 == saveResult) {
ignoreCnt++;
} else {
sucessCnt++;
}
}
LOGGER.debug("待翻譯個數(shù):" + params.size() + ",翻譯成功個數(shù):" + sucessCnt + ",已存在并忽略個數(shù):" + ignoreCnt);
}
threadCnt.decrementAndGet();// 運行中的線程數(shù)-1
distory();// 清理數(shù)據(jù),必須放在方法最后,否則distory中的判斷需要修改
}
/**
* 如果是最后一個線程,清空隊列和existsKey中的數(shù)據(jù)
*/
private void distory() {
if (0 == threadCnt.get()) {
// 最后一個線程退出時,執(zhí)行清理操作
existsKey.clear();
que.clear();
}
}
};
thr.setDaemon(true);// 守護線程,如果主線程執(zhí)行完畢,則此線程會自動銷毀
thr.setName("baidufanyi-" + RandomUtils.nextInt(1000, 9999));
thr.start();// 啟動插入線程
}
/**
* 百度翻譯
*
* @param utf8q
* 待翻譯的字符串,需要utf8格式的
* @param from
* 百度翻譯語言列表中的代碼
* 參見:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
* @param to
* 百度翻譯語言列表中的代碼
* 參見:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList
* @return 翻譯結(jié)果
*/
private Map<String, String> getBaiduTranslation(String utf8q, String from, String to) {
Map<String, String> result = new HashMap<>();
String baiduurlStr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
if (StringUtils.isAnyBlank(baiduurlStr)) {
LOGGER.warn("百度翻譯API接口URL相關(guān)參數(shù)為空!");
return result;
}
Map<String, String> params = buildParams(utf8q, from, to);
if (params.isEmpty()) {
return result;
}
String sendUrl = getUrlWithQueryString(baiduurlStr, params);
try {
HttpClient httpClient = new HttpClient();
httpClient.setMethod("GET");
String remoteResult = httpClient.pub(sendUrl, "");
result = convertRemote(remoteResult);
} catch (Exception e) {
LOGGER.info("百度翻譯API返回結(jié)果異常!", e);
}
return result;
}
private Map<String, String> convertRemote(String remoteResult) {
Map<String, String> result = new HashMap<>();
if (StringUtils.isBlank(remoteResult)) {
return result;
}
JSONObject jsonObject = JSONObject.parseObject(remoteResult);
JSONArray trans_result = jsonObject.getJSONArray("trans_result");
if (Objects.isNull(trans_result) || trans_result.isEmpty()) {
return result;
}
for (Object object : trans_result) {
JSONObject trans = (JSONObject) object;
result.put(trans.getString("src"), trans.getString("dst"));
}
return result;
}
private Map<String, String> buildParams(String utf8q, String from, String to) {
if (StringUtils.isBlank(from)) {
from = "auto";
}
Map<String, String> params = new HashMap<String, String>();
String skStr = "sk";
String appidStr = "appid";
if (StringUtils.isAnyBlank(skStr, appidStr)) {
LOGGER.warn("百度翻譯API接口相關(guān)參數(shù)為空!");
return params;
}
params.put("q", utf8q);
params.put("from", from);
params.put("to", to);
params.put("appid", appidStr);
// 隨機數(shù)
String salt = String.valueOf(System.currentTimeMillis());
params.put("salt", salt);
// 簽名
String src = appidStr + utf8q + salt + skStr; // 加密前的原文
params.put("sign", MD5Util.md5Encrypt(src).toLowerCase());
return params;
}
public static String getUrlWithQueryString(String url, Map<String, String> params) {
if (params == null) {
return url;
}
StringBuilder builder = new StringBuilder(url);
if (url.contains("?")) {
builder.append("&");
} else {
builder.append("?");
}
int i = 0;
for (String key : params.keySet()) {
String value = params.get(key);
if (value == null) { // 過濾空的key
continue;
}
if (i != 0) {
builder.append('&');
}
builder.append(key);
builder.append('=');
builder.append(encode(value));
i++;
}
return builder.toString();
}
/**
* 對輸入的字符串進行URL編碼, 即轉(zhuǎn)換為%20這種形式
*
* @param input
* 原文
* @return URL編碼. 如果編碼失敗, 則返回原文
*/
public static String encode(String input) {
if (input == null) {
return "";
}
try {
return URLEncoder.encode(input, "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return input;
}
}
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,謝謝大家對腳本之家的支持。
相關(guān)文章
springboot連接redis并動態(tài)切換database的實現(xiàn)方法
這篇文章主要介紹了springboot連接redis并動態(tài)切換database,本文主為通過修改ConnectionFactory從而達到動態(tài)切換database的效果,結(jié)合示例代碼給大家介紹的非常詳細,需要的朋友可以參考下2022-03-03
java拋出異常后,后續(xù)代碼是否繼續(xù)執(zhí)行詳解
這篇文章主要給大家介紹了關(guān)于java拋出異常后,后續(xù)代碼是否繼續(xù)執(zhí)行詳?shù)南嚓P(guān)資料,在Java編程中,異常是當(dāng)程序執(zhí)行時遇到問題時拋出的一種特殊情況,需要的朋友可以參考下2023-07-07
Spring+SpringMVC+MyBatis整合詳細教程(SSM)
Spring是一個開源框架,Spring是于2003 年興起的一個輕量級的Java 開發(fā)框架。這篇文章主要介紹了Spring+SpringMVC+MyBatis整合詳細教程(SSM),需要的朋友可以參考下2017-10-10
SpringBoot整合MybatisPlus的基本應(yīng)用詳解
MyBatis-Plus (簡稱 MP)是一個 MyBatis的增強工具,在 MyBatis 的基礎(chǔ)上只做增強不做改變,為 簡化開發(fā)、提高效率而生,本文將給大家介紹一下SpringBoot整合MybatisPlus的基本應(yīng)用,需要的朋友可以參考下2024-05-05
springboot使用DynamicDataSource動態(tài)切換數(shù)據(jù)源的實現(xiàn)過程
這篇文章主要給大家介紹了關(guān)于springboot使用DynamicDataSource動態(tài)切換數(shù)據(jù)源的實現(xiàn)過程,Spring Boot應(yīng)用中可以配置多個數(shù)據(jù)源,并根據(jù)注解靈活指定當(dāng)前使用的數(shù)據(jù)源,需要的朋友可以參考下2023-08-08

