java通過(guò)RESTful API實(shí)現(xiàn)兩個(gè)項(xiàng)目之間相互傳輸數(shù)據(jù)
項(xiàng)目場(chǎng)景
一些特殊場(chǎng)景中,兩個(gè)項(xiàng)目發(fā)布在不同的服務(wù)器,并且由于服務(wù)器限制特殊情況ip無(wú)法相通時(shí)進(jìn)行開(kāi)放接口方式進(jìn)行數(shù)據(jù)傳輸
問(wèn)題描述
兩個(gè)服務(wù)器之間ip無(wú)法相互訪問(wèn),數(shù)據(jù)沒(méi)法進(jìn)行數(shù)據(jù)傳輸
解決方案
通過(guò)RESTful API方式定義開(kāi)放接口實(shí)現(xiàn)數(shù)據(jù)傳輸
1.開(kāi)放接口定義
@RequestMapping(value = "/getss", produces = "application/json;charset=UTF-8")
public void getTestChunked(HttpServletResponse response) {
log.info("getTest分塊傳輸接口調(diào)用");
long startTime = System.currentTimeMillis();
OutputStream outputStream = null;
JsonGenerator generator = null;
// 調(diào)用 GetTime 類(lèi)的 getFSKTime 方法
GetTime getTime = new GetTime();
HashMap<Object, Object> timeData = getTime.getFSKTime();
try {
// 獲取查詢參數(shù)
String formattedDate = (String) timeData.get("formattedDate");
String formattedEnd = (String) timeData.get("formattedEnd");
log.info("查詢時(shí)間范圍:{} ~ {}", formattedDate, formattedEnd);
// 獲取所有數(shù)據(jù)
DataSourceUtil.setDB("db2");
List<HbCcsPolicyDataRc> dataList = synchronizationService.getSynchronization(formattedDate, formattedEnd);
if (dataList == null) {
dataList = Collections.emptyList();
}
log.info("查詢完成:共{}條數(shù)據(jù),準(zhǔn)備分塊傳輸", dataList.size());
// 設(shè)置響應(yīng)頭
response.setContentType("application/json;charset=UTF-8");
response.setHeader("Connection", "keep-alive");
response.setHeader("Transfer-Encoding", "chunked");
response.setHeader("X-Total-Count", String.valueOf(dataList.size()));
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Pragma", "no-cache");
// 使用Jackson流式API分塊寫(xiě)入響應(yīng)
ObjectMapper objectMapper = new ObjectMapper();
outputStream = response.getOutputStream();
generator = objectMapper.getFactory().createGenerator(outputStream);
generator.writeStartArray(); // 開(kāi)始數(shù)組
int chunkSize = 500; // 減小每塊的大小,避免緩沖區(qū)溢出
int totalSize = dataList.size();
for (int i = 0; i < totalSize; i++) {
// 檢查客戶端是否仍然連接
try {
response.getOutputStream(); // 這將拋出異常如果客戶端斷開(kāi)
} catch (IOException e) {
log.warn("客戶端已斷開(kāi)連接,終止傳輸");
break;
}
// 寫(xiě)入單個(gè)對(duì)象
objectMapper.writeValue(generator, dataList.get(i));
// 每chunkSize條數(shù)據(jù)刷新一次緩沖區(qū)
if ((i + 1) % chunkSize == 0) {
generator.flush();
log.info("已傳輸{}條數(shù)據(jù),進(jìn)度: {}%", i + 1, (i + 1) * 100 / totalSize);
// 添加小延遲,避免 overwhelming 客戶端
try {
Thread.sleep(10);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
generator.writeEndArray(); // 結(jié)束數(shù)組
generator.flush();
log.info("分塊傳輸完成:共{}條數(shù)據(jù),耗時(shí){}ms",
dataList.size(), System.currentTimeMillis() - startTime);
} catch (ClientAbortException e) {
log.warn("客戶端中止了連接: {}", e.getMessage());
} catch (Exception e) {
log.error("getTest分塊傳輸接口執(zhí)行失敗", e);
if (!response.isCommitted()) {
try {
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), "數(shù)據(jù)獲取失敗");
} catch (IOException ex) {
log.error("發(fā)送錯(cuò)誤響應(yīng)失敗", ex);
}
}
} finally {
// 確保資源被正確關(guān)閉
try {
if (generator != null) {
generator.close();
}
} catch (IOException e) {
log.warn("關(guān)閉JsonGenerator時(shí)發(fā)生錯(cuò)誤", e);
}
try {
if (outputStream != null) {
outputStream.close();
}
} catch (IOException e) {
log.warn("關(guān)閉OutputStream時(shí)發(fā)生錯(cuò)誤", e);
}
// 幫助GC回收大對(duì)象
data1 = null;
System.gc();
}
}2.開(kāi)放接口調(diào)用
public void fetchDataFromServer1() {
log.info("===== 開(kāi)始執(zhí)行 fetchDataFromServer1(分塊傳輸模式)=====");
RestTemplate chunkedRestTemplate = createChunkedRestTemplate();
String url = server1BaseUrl + "/getReceive/getsss";
log.info("調(diào)用 getTestChunked 分塊傳輸接口,URL: {}", url);
try {
// 使用自定義的ResponseExtractor來(lái)處理流式響應(yīng)
List<HbCcsPolicyDataRc> dataList = chunkedRestTemplate.execute(
url,
HttpMethod.GET,
null,
new ResponseExtractor<List<HbCcsPolicyDataRc>>() {
@Override
public List<HbCcsPolicyDataRc> extractData(ClientHttpResponse response) throws IOException {
return processStreamingResponse(response, HbCcsPolicyDataRc.class);
}
}
);
log.info("===== fetchDataFromServer1 完成,共處理{}條數(shù)據(jù) =====", dataList != null ? dataList.size() : 0);
getCCSCroData();
} catch (Exception e) {
log.error("fetchDataFromServer1 整體失敗", e);
throw new RuntimeException("fetchDataFromServer1 失?。? + e.getMessage(), e);
} finally {
// 幫助GC回收資源
System.gc();
}
}// 修改流式響應(yīng)處理方法
private <T> List<T> processStreamingResponse(ClientHttpResponse response, Class<T> valueType) throws IOException {
List<T> dataList = new ArrayList<>();
ObjectMapper objectMapper = getConfiguredObjectMapper();
InputStream inputStream = response.getBody();
try (JsonParser parser = objectMapper.getFactory().createParser(inputStream)) {
if (parser.nextToken() != JsonToken.START_ARRAY) {
throw new IOException("Expected data to start with an Array");
}
int count = 0;
long lastLogTime = System.currentTimeMillis();
while (parser.nextToken() != JsonToken.END_ARRAY) {
T record = objectMapper.readValue(parser, valueType);
dataList.add(record);
count++;
// 每1000條或每30秒日志輸出一次
long currentTime = System.currentTimeMillis();
if (count % 1000 == 0 || currentTime - lastLogTime > 30000) {
log.info("已解析{}條數(shù)據(jù)", count);
lastLogTime = currentTime;
}
// 定期批量處理,避免內(nèi)存占用過(guò)高
if (count % 5000 == 0) {
processBatchData(dataList, valueType);
dataList.clear(); // 清空列表,避免內(nèi)存占用過(guò)高
System.gc(); // 建議垃圾回收
}
}
// 處理最后一批數(shù)據(jù)
if (!dataList.isEmpty()) {
processBatchData(dataList, valueType);
}
}
log.info("共解析{}條數(shù)據(jù)", dataList.size());
return dataList;
}
// 批量處理數(shù)據(jù)的方法
private <T> void processBatchData(List<T> dataList, Class<T> valueType) {
try {
DataSourceUtil.setDB("db3");
if (valueType == HbCcsPolicyDataRc.class) {
dmService.setHbCcsPolicyDataRc((List<HbCcsPolicyDataRc>) dataList);
} else if (valueType == HbCcsCrosssellPEMIUM.class) {
dmService.setHbCcsCrosssellPEMIUM((List<HbCcsCrosssellPEMIUM>) dataList);
} else if (valueType == HbCcsLpDataRc.class) {
dmService.HbCcsLpDataRc((List<HbCcsLpDataRc>) dataList);
}
log.info("成功處理一批數(shù)據(jù),數(shù)量: {}", dataList.size());
} catch (Exception e) {
log.error("處理批量數(shù)據(jù)時(shí)發(fā)生錯(cuò)誤", e);
// 這里可以添加重試邏輯或錯(cuò)誤記錄
}
}
// 6. 工具方法:復(fù)用 JSON 解析器配置(避免重復(fù)設(shè)置容錯(cuò)屬性)
private ObjectMapper getConfiguredObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
// 容錯(cuò)配置:忽略未知字段、允許單值數(shù)組
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
return objectMapper;
}
// 創(chuàng)建專用的分塊傳輸RestTemplate
private RestTemplate createChunkedRestTemplate() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
// 設(shè)置超時(shí)時(shí)間(單位:毫秒)
factory.setConnectTimeout(120000); // 2分鐘連接超時(shí)
factory.setReadTimeout(3600000); // 1小時(shí)讀取超時(shí)(大數(shù)據(jù)傳輸需要更長(zhǎng)時(shí)間)
// 禁用緩沖
factory.setBufferRequestBody(false);
RestTemplate restTemplate = new RestTemplate(factory);
// 配置消息轉(zhuǎn)換器
List<HttpMessageConverter<?>> converters = new ArrayList<>();
// 字符串轉(zhuǎn)換器
StringHttpMessageConverter stringConverter = new StringHttpMessageConverter(StandardCharsets.UTF_8);
stringConverter.setWriteAcceptCharset(false);
converters.add(stringConverter);
// JSON轉(zhuǎn)換器
MappingJackson2HttpMessageConverter jsonConverter = new MappingJackson2HttpMessageConverter();
jsonConverter.setSupportedMediaTypes(Collections.singletonList(MediaType.APPLICATION_JSON));
converters.add(jsonConverter);
restTemplate.setMessageConverters(converters);
return restTemplate;
}3.核心配置
這個(gè)配置兩個(gè)項(xiàng)目的yml都需要加
server1:
base-url: http://127.0.0.1:8060#根據(jù)情況而變
auth:
username: admin
password: admin
server:
max-http-header-size: 1000000
servlet:
multipart:
max-file-size: 1000MB
max-request-size: 1000MB
tomcat:
max-swallow-size: 1000MB
max-http-form-post-size: 1000MB
threads:
max: 200
min-spare: 20
keep-alive-timeout: 300000@Value("${server1.base-url}")
private String server1BaseUrl;這段代碼加到controller類(lèi)中這是將yml配置文件的路徑注冊(cè)到controller了
這種適合小數(shù)據(jù)傳輸,大數(shù)據(jù)傳輸需要配置服務(wù)器的Nginx配置需要將接口配置上盡量統(tǒng)一接口前綴不然可能會(huì)報(bào)錯(cuò)具體就不展示了
到此這篇關(guān)于java通過(guò)RESTful API實(shí)現(xiàn)兩個(gè)項(xiàng)目之間相互傳輸數(shù)據(jù)的文章就介紹到這了,更多相關(guān)java項(xiàng)目互傳輸數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java調(diào)用mysql存儲(chǔ)過(guò)程實(shí)例分析
這篇文章主要介紹了java調(diào)用mysql存儲(chǔ)過(guò)程的方法,以實(shí)例形式較為詳細(xì)的分析了mysql數(shù)據(jù)庫(kù)的建立和存儲(chǔ)過(guò)程的實(shí)現(xiàn)方法,需要的朋友可以參考下2015-06-06
Java基礎(chǔ)知識(shí)之ByteArrayOutputStream流的使用
這篇文章主要介紹了Java基礎(chǔ)知識(shí)之ByteArrayOutputStream流的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Java高級(jí)特性基礎(chǔ)之反射五連問(wèn)
反射賦予了我們?cè)谶\(yùn)行時(shí)分析類(lèi)以及執(zhí)行類(lèi)中方法的能力。通過(guò)反射你可以獲取任意一個(gè)類(lèi)的所有屬性和方法,你還可以調(diào)用這些方法和屬性。本文就來(lái)和大家詳細(xì)聊聊Java中的反射,感興趣的可以了解一下2023-01-01
Java全面細(xì)致講解Cookie與Session及kaptcha驗(yàn)證碼的使用
web開(kāi)發(fā)階段我們主要是瀏覽器和服務(wù)器之間來(lái)進(jìn)行交互。瀏覽器和服務(wù)器之間的交互就像人和人之間進(jìn)行交流一樣,但是對(duì)于機(jī)器來(lái)說(shuō),在一次請(qǐng)求之間只是會(huì)攜帶著本次請(qǐng)求的數(shù)據(jù)的,但是可能多次請(qǐng)求之間是會(huì)有聯(lián)系的,所以提供了會(huì)話機(jī)制2022-06-06
Mybatis如何解決sql中l(wèi)ike通配符模糊匹配問(wèn)題
這篇文章主要介紹了Mybatis如何解決sql中l(wèi)ike通配符模糊匹配問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
關(guān)于MyBatis plus條件構(gòu)造器的逐條詳解
什么是條件構(gòu)造器呢?簡(jiǎn)單來(lái)說(shuō),條件構(gòu)造器就是用來(lái)生成我們查數(shù)據(jù)庫(kù)的sql。它可以簡(jiǎn)化sql代碼的編寫(xiě),靈活、方便且易于維護(hù)2021-09-09
Java中的String.valueOf()和toString()方法區(qū)別小結(jié)
字符串操作是開(kāi)發(fā)者日常編程任務(wù)中不可或缺的一部分,轉(zhuǎn)換為字符串是一種常見(jiàn)需求,其中最常見(jiàn)的就是String.valueOf()和toString()方法,本文主要介紹了Java中的String.valueOf()和toString()方法區(qū)別小結(jié),感興趣的可以了解一下2025-04-04
SpringBoot中各種Controller的寫(xiě)法
這篇文章主要介紹了SpringBoot中各種Controller的寫(xiě)法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07

