Java按時(shí)間梯度實(shí)現(xiàn)異步回調(diào)接口的方法
1. 背景
在業(yè)務(wù)處理完之后,需要調(diào)用其他系統(tǒng)的接口,將相應(yīng)的處理結(jié)果通知給對(duì)方,若是同步請(qǐng)求,假如調(diào)用的系統(tǒng)出現(xiàn)異?;蚴清礄C(jī)等事件,會(huì)導(dǎo)致自身業(yè)務(wù)受到影響,事務(wù)會(huì)一直阻塞,數(shù)據(jù)庫連接不夠用等異常現(xiàn)象,可以通過異步回調(diào)來防止阻塞,但異步的情況還存在一個(gè)問題,若調(diào)用一次不成功的話接下來怎么處理?這個(gè)地方就需要按時(shí)間梯度回調(diào),比如前期按10s間隔回調(diào),回調(diào)3次,若不成功按30s回調(diào),回調(diào)2次,再不成功按分鐘回調(diào),依次類推……相當(dāng)于給了對(duì)方系統(tǒng)恢復(fù)的時(shí)間,不可能一直處于異?;蝈礄C(jī)等異常狀態(tài),若是再不成功可以再通過人工干預(yù)的手段去處理了,具體業(yè)務(wù)具體實(shí)現(xiàn)。
2. 技術(shù)實(shí)現(xiàn)
大體實(shí)現(xiàn)思路如下圖,此過程用到兩個(gè)隊(duì)列,當(dāng)前隊(duì)列和Next隊(duì)列,當(dāng)前隊(duì)列用來存放第一次需要回調(diào)的數(shù)據(jù)對(duì)象,如果調(diào)用不成功則放入Next隊(duì)列,按照制定的時(shí)間策略再繼續(xù)回調(diào),直到成功或最終持久化后人工接入處理。
用到的技術(shù)如下:
•http請(qǐng)求庫,retrofit2
•隊(duì)列,LinkedBlockingQueue
•調(diào)度線程池,ScheduledExecutorService
3. 主要代碼說明
3.1 回調(diào)時(shí)間梯度的策略設(shè)計(jì)
采用枚舉來對(duì)策略規(guī)則進(jìn)行處理,便于代碼上的維護(hù),該枚舉設(shè)計(jì)三個(gè)參數(shù),級(jí)別、回調(diào)間隔、回調(diào)次數(shù);
/** * 回調(diào)策略 */ public enum CallbackType { //等級(jí)1,10s執(zhí)行3次 SECONDS_10(1, 10, 3), //等級(jí)2,30s執(zhí)行2次 SECONDS_30(2, 30, 2), //等級(jí)3,60s執(zhí)行2次 MINUTE_1(3, 60, 2), //等級(jí)4,5min執(zhí)行1次 MINUTE_5(4, 300, 1), //等級(jí)5,30min執(zhí)行1次 MINUTE_30(5, 30*60, 1), //等級(jí)6,1h執(zhí)行2次 HOUR_1(6, 60*60, 1), //等級(jí)7,3h執(zhí)行2次 HOUR_3(7, 60*60*3, 1), //等級(jí)8,6h執(zhí)行2次 HOUR_6(8, 60*60*6, 1); //級(jí)別 private int level; //回調(diào)間隔時(shí)間 秒 private int intervalTime; //回調(diào)次數(shù) private int count; }
3.2 數(shù)據(jù)傳輸對(duì)象設(shè)計(jì)
聲明抽象父類,便于其他對(duì)象調(diào)用傳輸繼承。
/** * 消息對(duì)象父類 */ public abstract class MessageInfo { //開始時(shí)間 private long startTime; //更新時(shí)間 private long updateTime; //是否回調(diào)成功 private boolean isSuccess=false; //回調(diào)次數(shù) private int count=0; //回調(diào)策略 private CallbackType callbackType; }
要傳輸?shù)膶?duì)象,繼承消息父類;
/** * 工單回調(diào)信息 */ public class WorkOrderMessage extends MessageInfo { //車架號(hào) private String vin; //工單號(hào) private String workorderno; //工單狀態(tài) private Integer status; //工單原因 private String reason; //操作用戶 private Integer userid; }
3.3 調(diào)度線程池的使用
//聲明線程池,大小為16 private ScheduledExecutorService pool = Executors.newScheduledThreadPool(16); ...略 while (true){ //從隊(duì)列獲取數(shù)據(jù),交給定時(shí)器執(zhí)行 try { WorkOrderMessage message = MessageQueue.getMessageFromNext(); long excueTime = message.getUpdateTime()+message.getCallbackType().getIntervalTime()* 1000; long t = excueTime - System.currentTimeMillis(); if (t/1000 < 5) {//5s之內(nèi)將要執(zhí)行的數(shù)據(jù)提交給調(diào)度線程池 System.out.println("MessageHandleNext-滿足定時(shí)器執(zhí)行條件"+JSONObject.toJSONString(message)); pool.schedule(new Callable<Boolean>() { @Override public Boolean call() throws Exception { remoteCallback(message); return true; } }, t, TimeUnit.MILLISECONDS); }else { MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { System.out.println(e); } }
3.4 retrofit2的使用,方便好用。
具體可查看官網(wǎng)相關(guān)文檔進(jìn)行了解,用起來還是比較方便的。http://square.github.io/retrofit/
retrofit初始化:
import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory; public class RetrofitHelper { private static final String HTTP_URL = "http://baidu.com/"; private static Retrofit retrofit; public static Retrofit instance(){ if (retrofit == null){ retrofit = new Retrofit.Builder() .baseUrl(HTTP_URL) .addConverterFactory(GsonConverterFactory.create()) .build(); } return retrofit; } }
如果需要修改超時(shí)時(shí)間,連接時(shí)間等可以這樣初始話,Retrofit采用OkHttpClient
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory; import java.util.concurrent.TimeUnit; public class RetrofitHelper { private static final String HTTP_URL = "http://baidu.com/"; private static Retrofit retrofit; public static Retrofit instance(){ if (retrofit == null){ retrofit = new Retrofit.Builder() .baseUrl(HTTP_URL) .client(new OkHttpClient.Builder() .connectTimeout(30, TimeUnit.SECONDS)//連接時(shí)間 .readTimeout(30, TimeUnit.SECONDS)//讀時(shí)間 .writeTimeout(30, TimeUnit.SECONDS)//寫時(shí)間 .build()) .addConverterFactory(GsonConverterFactory.create()) .build(); } return retrofit; } }
Retrofit使用通過接口調(diào)用,要先聲明一個(gè)接口;
import com.alibaba.fastjson.JSONObject; import com.woasis.callbackdemo.bean.WorkOrderMessage; import retrofit2.Call; import retrofit2.http.Body; import retrofit2.http.POST; public interface WorkOrderMessageInterface { @POST("/api") Call<JSONObject> updateBatteryInfo(@Body WorkOrderMessage message); }
接口和實(shí)例對(duì)象準(zhǔn)備好了,接下來就是調(diào)用;
private void remoteCallback(WorkOrderMessage message){ //實(shí)例接口對(duì)象 WorkOrderMessageInterface workOrderMessageInterface = RetrofitHelper.instance().create(WorkOrderMessageInterface.class); //調(diào)用接口方法 Call<JSONObject> objectCall = workOrderMessageInterface.updateBatteryInfo(message); System.out.println("遠(yuǎn)程調(diào)用執(zhí)行:"+new Date()); //異步調(diào)用執(zhí)行 objectCall.enqueue(new Callback<JSONObject>() { @Override public void onResponse(Call<JSONObject> call, Response<JSONObject> response) { System.out.println("MessageHandleNext****調(diào)用成功"+Thread.currentThread().getId()); message.setSuccess(true); System.out.println("MessageHandleNext-回調(diào)成功"+JSONObject.toJSONString(message)); } @Override public void onFailure(Call<JSONObject> call, Throwable throwable) { System.out.println("MessageHandleNext++++調(diào)用失敗"+Thread.currentThread().getId()); //失敗后再將數(shù)據(jù)放入隊(duì)列 try { //對(duì)回調(diào)策略初始化 long currentTime = System.currentTimeMillis(); message.setUpdateTime(currentTime); message.setSuccess(false); CallbackType callbackType = message.getCallbackType(); //獲取等級(jí) int level = CallbackType.getLevel(callbackType); //獲取次數(shù) int count = CallbackType.getCount(callbackType); //如果等級(jí)已經(jīng)最高,則不再回調(diào) if (CallbackType.HOUR_6.getLevel() == callbackType.getLevel() && count == message.getCount()){ System.out.println("MessageHandleNext-等級(jí)最高,不再回調(diào), 線下處理:"+JSONObject.toJSONString(message)); }else { //看count是否最大,count次數(shù)最大則增加level if (message.getCount()<callbackType.getCount()){ message.setCount(message.getCount()+1); }else {//如果不小,則增加level message.setCount(1); level += 1; message.setCallbackType(CallbackType.getTypeByLevel(level)); } MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println("MessageHandleNext-放入隊(duì)列數(shù)據(jù)失敗"); } } }); }
3.5結(jié)果實(shí)現(xiàn)
4.總結(jié)
本次實(shí)現(xiàn)了按照時(shí)間梯度去相應(yīng)其他系統(tǒng)的接口,不再導(dǎo)致本身業(yè)務(wù)因其他系統(tǒng)的異常而阻塞。
源碼:https://github.com/liuzwei/callback-demo
以上所述是小編給大家介紹的Java按時(shí)間梯度實(shí)現(xiàn)異步回調(diào)接口,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
相關(guān)文章
SpringCloud?微服務(wù)數(shù)據(jù)權(quán)限控制的實(shí)現(xiàn)
這篇文章主要介紹的是權(quán)限控制的數(shù)據(jù)權(quán)限層面,意思是控制可訪問數(shù)據(jù)資源的數(shù)量,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-11-11說一說java關(guān)鍵字final和transient
這篇文章主要和大家說一說java關(guān)鍵字final和transient,感興趣的小伙伴們可以參考一下2016-06-06SpringBoot統(tǒng)一響應(yīng)格式及統(tǒng)一異常處理
在我們開發(fā)SpringBoot后端服務(wù)時(shí),一般需要給前端統(tǒng)一響應(yīng)格式,本文主要介紹了SpringBoot統(tǒng)一響應(yīng)格式及統(tǒng)一異常處理2023-05-05淺談在Spring中如何使用數(shù)據(jù)源(DBCP、C3P0、JNDI)
這篇文章主要介紹了淺談在Spring中如何使用數(shù)據(jù)源(DBCP、C3P0、JNDI),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-10-10Java工廠模式之簡(jiǎn)單工廠,工廠方法,抽象工廠模式詳解
這篇文章主要為大家詳細(xì)介紹了Java工廠模式之簡(jiǎn)單工廠、工廠方法、抽象工廠模式,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-02-02springboot+mybatis+redis 二級(jí)緩存問題實(shí)例詳解
Mybatis默認(rèn)沒有開啟二級(jí)緩存,需要在全局配置(mybatis-config.xml)中開啟二級(jí)緩存。本文講述的是使用Redis作為緩存,與springboot、mybatis進(jìn)行集成的方法。需要的朋友參考下吧2017-12-12Spring?Security過濾器鏈體系的實(shí)例詳解
這篇文章主要介紹了Spring?Security過濾器鏈體系,通過思維導(dǎo)圖可以很好的幫助大家理解配置類的相關(guān)知識(shí),結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-02-02Spring mvc服務(wù)端數(shù)據(jù)校驗(yàn)實(shí)現(xiàn)流程詳解
這篇文章主要介紹了Spring mvc服務(wù)端數(shù)據(jù)校驗(yàn)實(shí)現(xiàn)流程詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09