java實現(xiàn)異步線程,回調接口方式
更新時間:2024年07月05日 09:20:05 作者:墨筆之風
這篇文章主要介紹了java實現(xiàn)異步線程,回調接口方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
最近在業(yè)余時間呢,無意間發(fā)現(xiàn)一個問題,使用異步線程推送回調數(shù)據(jù)
這里小編使用了兩個IDEA程序分別模擬接收方和發(fā)送方
發(fā)送方
package com.slg.util; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import com.slg.entity.dto.SettlementMergerResp; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; import org.springframework.beans.factory.annotation.Autowired; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; /** * @author Administrator * 異步處理數(shù)據(jù)推送 */ @Slf4j public class CallbackUtil { // 推送地址(使用另一個IDEA模擬(http://localhost:8848/callback)這個客戶端) private static final String INTERFACE_CALLBACK = "http://localhost:8848/callback"; // private static final ExecutorService executorService = Executors.newFixedThreadPool(10); private static final AtomicInteger SUBMITTED_TASKS = new AtomicInteger(0); private static final ExecutorService executorService = CustomThreadPoolUtil.getExecutorService(); /** * @param object * @Description 構建異步線程,回調接口 * @Date 2024/5/29 * 等待線程池關閉完成后再提交任務 shutdown * 如果你想立即關閉線程池 hutdownNow() * 確保不會在關閉線程池后提交任務 isShutdown **/ public static void getCallBackThread(Object object) { log.info("回調接口=======================>正在進行數(shù)據(jù)推送:{}", object); if (!executorService.isShutdown()) { log.info("準備推送數(shù)據(jù): {}", object); SUBMITTED_TASKS.incrementAndGet(); executorService.submit(() -> { try { CallbackUtil.sendCallback(object); log.info("數(shù)據(jù)成功推送給用戶!"); } catch (Exception e) { log.error("推送數(shù)據(jù)時出現(xiàn)異常: {}", e.getMessage()); } finally { SUBMITTED_TASKS.decrementAndGet(); if (SUBMITTED_TASKS.get() == 0) { log.info("關閉線程池"); executorService.shutdown(); log.info("已關閉線程池"); } } }); SUBMITTED_TASKS.incrementAndGet(); } log.info("回調接口=======================>正在進行數(shù)據(jù)推送完畢:{}", object); } /** * @param object (測試對象=>可自行模擬) * @Description 數(shù)據(jù)推送地址 * @Date 2024/5/29 **/ public static void sendCallback(Object object) throws Exception { URL url = new URL(INTERFACE_CALLBACK); HttpURLConnection connection = (HttpURLConnection) url.openConnection(); connection.setRequestMethod("POST"); connection.setRequestProperty("Content-Type", "application/json"); connection.setDoOutput(true); String jsonPayload = convertToJson(object); try (OutputStream outputStream = connection.getOutputStream()) { outputStream.write(jsonPayload.getBytes()); outputStream.flush(); } int responseCode = connection.getResponseCode(); if (responseCode == HttpURLConnection.HTTP_OK) { // TODO 請求成功,可以根據(jù)需要進行進一步處理 } else { // TODO 請求失敗,可以根據(jù)需要進行錯誤處理 } connection.disconnect(); } private static String convertToJson(Object object) { return new Gson().toJson(object); } }
接收方
package com.example.demo.controller; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; import lombok.Data; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.util.concurrent.*; /** * @author Administrator * 手動創(chuàng)建線程池 */ @Data public class CustomThreadPoolUtil { public static void main(String[] args) throws IOException { int port = 8848; HttpServer server = HttpServer.create(new InetSocketAddress(port), 0); server.createContext("/callback", (HttpHandler) new CallbackHandler()); server.setExecutor(null); server.start(); System.out.println("服務器已啟動,監(jiān)聽端口:" + port); } static class CallbackHandler implements HttpHandler { @Override public void handle(HttpExchange exchange) throws IOException { String requestMethod = exchange.getRequestMethod(); if (requestMethod.equalsIgnoreCase("POST")) { InputStream requestBody = exchange.getRequestBody(); byte[] buffer = new byte[requestBody.available()]; requestBody.read(buffer); String requestData = new String(buffer); System.out.println("接收到的數(shù)據(jù):" + requestData); String response = "數(shù)據(jù)已接收"; exchange.sendResponseHeaders(200, response.getBytes().length); OutputStream outputStream = exchange.getResponseBody(); outputStream.write(response.getBytes()); outputStream.close(); } else { exchange.sendResponseHeaders(405, -1); } } } }
自定義線程池
package com.slg.util; import lombok.Data; import java.util.concurrent.*; /** * @author Administrator * 手動創(chuàng)建線程池 */ @Data public class CustomThreadPoolUtil { // 線程池大小 private static final int CORE_POOL_SIZE = 10; private static final int MAXIMUM_POOL_SIZE = 20; private static final long KEEP_ALIVE_TIME = 60L; private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingQueue<>(); // 自定義線程工廠 private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { private int count = 0; @Override public Thread newThread(Runnable r) { return new Thread(r, "CustomThreadPool-" + count++); } }; // 創(chuàng)建線程池 private static final ExecutorService executorService = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TIME_UNIT, WORK_QUEUE, THREAD_FACTORY ); public static ExecutorService getExecutorService() { return executorService; } }
測試效果:
總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
PowerJob的WorkerHealthReporter工作流程源碼解讀
這篇文章主要為大家介紹了PowerJob的WorkerHealthReporter工作流程源碼解讀,2023-12-12詳解Java中的do...while循環(huán)語句的使用方法
這篇文章主要介紹了Java中的do...while循環(huán)語句的使用方法,是Java入門學習中的基礎知識,需要的朋友可以參考下2015-10-10