xxl-job如何濫用netty導(dǎo)致的問題及解決方案
netty作為一種高性能的網(wǎng)絡(luò)編程框架,在很多開源項(xiàng)目中大放異彩,十分亮眼,但是在有些項(xiàng)目中卻被濫用,導(dǎo)致使用者使用起來非常的難受。
筆者使用的是2.3.0版本的xxl-job,也是當(dāng)前的最新版本;下面所有的代碼修改全部基于2.3.0版本的xxl-job源代碼
https://github.com/xuxueli/xxl-job/tree/2.3.0
其中,xxl-job-admin對(duì)應(yīng)著項(xiàng)目:https://github.com/xuxueli/xxl-job/tree/2.3.0/xxl-job-admin
spring-boot項(xiàng)目對(duì)應(yīng)著示例項(xiàng)目:https://github.com/xuxueli/xxl-job/tree/master/xxl-job-executor-samples/xxl-job-executor-sample-springboot
一、xxl-job存在的多端口問題
關(guān)于xxl-job如何使用的問題,可以參考我的另外一篇文章:分布式任務(wù)調(diào)度系統(tǒng):xxl-job
現(xiàn)在java開發(fā)基本上已經(jīng)離不開spring boot了吧,我在spring boot中集成了xxl-job-core組件并且已經(jīng)能夠正常使用,但是一旦部署到測(cè)試環(huán)境就不行了,這是因?yàn)闇y(cè)試環(huán)境使用了docker,spring boot集成xxl-job-core組件之后會(huì)額外開啟9999端口號(hào)給xxl-job-admin調(diào)用使用,如果docker不開啟宿主機(jī)到docker的端口映射,xxl-job-admin自然就會(huì)調(diào)用失敗。這導(dǎo)致了以下問題:
- 每個(gè)spring boot程序都要開兩個(gè)端口號(hào),意味著同時(shí)運(yùn)行著兩個(gè)服務(wù)進(jìn)行端口監(jiān)聽,浪費(fèi)計(jì)算和內(nèi)存資源
- 如果使用docker部署,需要再額外做宿主機(jī)和容器的9999端口號(hào)的映射,否則外部的xxl-job-admin將無法訪問。
那如果兩個(gè)不同的服務(wù)都集成了xxl-job,但是部署在同一臺(tái)機(jī)器上,又會(huì)發(fā)生什么呢?答案是如果不指定特定端口號(hào),兩個(gè)服務(wù)肯定都要使用9999端口號(hào),勢(shì)必會(huì)端口沖突,但是xxl-job已經(jīng)想到了9999端口號(hào)被占用的情況,如果9999端口號(hào)被占用,則會(huì)端口號(hào)加一再重試。
xxl-job-core組件額外開啟9999端口號(hào)到底合不合理?
舉個(gè)例子:spring boot程序集成swagger-ui是很常見的操作吧,也沒見swagger-ui再額外開啟端口號(hào)啊,我認(rèn)為是不合理的。但是,我認(rèn)為作者這樣做也有他的考慮---并非所有程序都是spring-boot的程序,也有使用其它框架的程序,使用獨(dú)立的netty server作為客戶端能夠保證在使用java的任意xxl-job客戶端都能穩(wěn)定的向xxl-job-admin提供服務(wù)。然而java開發(fā)者們絕大多數(shù)情況下都是使用spirng-boot構(gòu)建程序,在這種情況下,作者偷懶沒有構(gòu)建專門在spirng boot框架下使用的xxl-job-core,而是想了個(gè)類似萬(wàn)金油的蠢招解決問題,讓所有在spring-boot框架下的開發(fā)者都一起難受,實(shí)在是令人費(fèi)解。
二、源碼追蹤
一切的起點(diǎn)要從spring-boot程序集成xxl-job-core說起,集成方式很簡(jiǎn)單,只需要成功創(chuàng)建一個(gè)XxlJobSpringExecutor Bean對(duì)象即可。
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
在XxlJobSpringExecutor對(duì)象創(chuàng)建完成之后會(huì)做一些xxl-job初始化的操作,包含連接xxl-job-admin以及啟動(dòng)netty server。
展開XxlJobSpringExecutor源碼,可以看到它實(shí)現(xiàn)了SmartInitializingSingleton接口,這就意味著Bean對(duì)象創(chuàng)建完成之后會(huì)回調(diào)afterSingletonsInstantiated接口
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
在super.start();這行代碼中,會(huì)調(diào)用父類XxlJobExecutor的start方法做初始化
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
在initEmbedServer(address, ip, port, appname, accessToken);這行代碼做開啟netty-server的操作
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
可以看到這里會(huì)創(chuàng)建EmbedServer對(duì)象,并且使用start方法開啟netty-server,在這里就能看到熟悉的一大坨了

除了開啟讀寫空閑檢測(cè)之外,就只做了一件事:開啟http服務(wù),也就是說,xxl-job-admin是通過http請(qǐng)求調(diào)用客戶端的接口觸發(fā)客戶端的任務(wù)調(diào)度的。最終處理方法在EmbedHttpServerHandler類中,順著EmbedHttpServerHandler類的方法找,可以最終找到處理的方法com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#process
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken!=null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
從這段代碼的邏輯可以看到
- 只接受POST請(qǐng)求
- 如果有token,則會(huì)校驗(yàn)token
- 只提供/beat、/idelBeat、/run、/kill、/log 五個(gè)接口,所有請(qǐng)求的處理都會(huì)委托給executorBiz處理。
最后,netty將executorBiz處理結(jié)果寫回xxl-job-admin,然后請(qǐng)求就結(jié)束了。這里netty扮演的角色非常簡(jiǎn)單,我認(rèn)為可以使用spring-mvc非常容易的替換掉它的功能。
三、使用spring-mvc替換netty的功能
1.新增spring-mvc代碼
這里要修改xxl-job-core的源代碼,首先,加入spring-mvc的依賴
<!-- spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
然后新增Controller文件
package com.xxl.job.core.controller;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
/**
* @author kdyzm
* @date 2021/5/7
*/
@RestController
public class XxlJobController {
@PostMapping("/beat")
public ReturnT<String> beat() {
return new ExecutorBizImpl().beat();
}
@PostMapping("/idleBeat")
public ReturnT<String> idleBeat(@RequestBody IdleBeatParam param) {
return new ExecutorBizImpl().idleBeat(param);
}
@PostMapping("/run")
public ReturnT<String> run(@RequestBody TriggerParam param) {
return new ExecutorBizImpl().run(param);
}
@PostMapping("/kill")
public ReturnT<String> kill(@RequestBody KillParam param) {
return new ExecutorBizImpl().kill(param);
}
@PostMapping("/log")
public ReturnT<LogResult> log(@RequestBody LogParam param) {
return new ExecutorBizImpl().log(param);
}
}
2.刪除老代碼&移除netty依賴
之后,就要?jiǎng)h除老的代碼了,修改com.xxl.job.core.server.EmbedServer#start方法,清空所有代碼,新增
// start registry startRegistry(appname, address);
然后刪除EmbedServer類中的以下兩個(gè)變量及相關(guān)的引用
private ExecutorBiz executorBiz;
private Thread thread;
之后刪除netty的依賴
<!-- ********************** embed server: netty + gson ********************** -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty-all.version}</version>
</dependency>
將報(bào)錯(cuò)的代碼全部刪除,之后就可以編譯成功了,當(dāng)然這還不行。
3.修改注冊(cè)到xxl-job-admin的端口號(hào)
注冊(cè)的ip地址可以不用改,但是端口號(hào)要取spring-boot程序的端口號(hào)。
因?yàn)橐獜?fù)用springk-boot容器的端口號(hào),所以這里注冊(cè)的端口號(hào)要和它保持一致,修改com.xxl.job.core.executor.XxlJobExecutor#initEmbedServer方法,注釋掉
port = port > 0 ? port : NetUtil.findAvailablePort(9999);
然后修改spring-boot的配置文件,xxl-job的端口號(hào)配置改成server.port
server.port=8081
xxl.job.executor.port=${server.port}
在創(chuàng)建XxlJobSpringExecutor Bean對(duì)象的時(shí)候?qū)⒏闹祩鬟f給它。
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
4.將xxl-job-core改造成spring-boot-starter
上面改造完了之后已經(jīng)將邏輯變更為使用spring-mvc,但是spring-boot程序還沒有辦法掃描到xxl-job-core中的controller,可以手動(dòng)掃描包,這里推薦使用spring-boot-starter,這樣只需要將xxl-job-core加入classpath,就可以自動(dòng)生效。
在 com.xxl.job.core.config包下新建Config類
package com.xxl.job.core.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author kdyzm
* @date 2021/5/7
*/
@Configuration
@ComponentScan(basePackages = {"com.xxl.job.core.controller"})
public class Config {
}
在src/main/resources/META-INF文件夾下新建spring.factories文件,文件內(nèi)容如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.xxl.job.core.config.Config
5.增加特殊前綴匹配
上面修改之后將使用spring mvc接口替代原netty功能提供的http接口,但是暴露出的接口是/run、/beat、/kill這種有可能和宿主服務(wù)路徑?jīng)_突的接口,為了防止出現(xiàn)路徑?jīng)_突,做出以下修改
修改com.xxl.job.core.controller.XxlJobController類,添加@RequestMapping("/xxl-job")
@RestController
@RequestMapping("/xxl-job")
public class XxlJobController {
...
}
修改com.xxl.job.core.biz.client.ExecutorBizClient類,為每個(gè)請(qǐng)求添加/xxl-job前綴
package com.xxl.job.core.biz.client;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.util.XxlJobRemotingUtil;
/**
* admin api test
*
* @author xuxueli 2017-07-28 22:14:52
*/
public class ExecutorBizClient implements ExecutorBiz {
public ExecutorBizClient() {
}
public ExecutorBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
private int timeout = 3;
@Override
public ReturnT<String> beat() {
return XxlJobRemotingUtil.postBody(addressUrl+"xxl-job/beat", accessToken, timeout, "", String.class);
}
@Override
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
return XxlJobRemotingUtil.postBody(addressUrl+"xxl-job/idleBeat", accessToken, timeout, idleBeatParam, String.class);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/run", accessToken, timeout, triggerParam, String.class);
}
@Override
public ReturnT<String> kill(KillParam killParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/kill", accessToken, timeout, killParam, String.class);
}
@Override
public ReturnT<LogResult> log(LogParam logParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "xxl-job/log", accessToken, timeout, logParam, LogResult.class);
}
}
這樣,就全部修改完了。
四、測(cè)試
重啟xxl-job-executor-sample-springboot項(xiàng)目,查看注冊(cè)到xxl-job-admin上的信息

可以看到端口號(hào)已經(jīng)不是默認(rèn)的9999,而是和spring-boot程序保持一致的端口號(hào),然后執(zhí)行默認(rèn)的job

可以看到已經(jīng)執(zhí)行成功,在查看日志詳情

日志也一切正常,表示一切都改造成功了。
完整的代碼修改:https://github.com/kdyzm/xxl-job/commit/449ee5c7bbb659356af25b164c251f960b9a6891
五、實(shí)際使用
由于原作者基本上不理睬人,我克隆了項(xiàng)目2.3.0版本并且新增了2.4.1版本:https://github.com/kdyzm/xxl-job/releases/tag/2.4.1
有需要的可以下載源代碼自己打包xxl-job-core項(xiàng)目上傳私服后就可以使用了
以上就是xxl-job如何濫用netty導(dǎo)致的問題及解決方案的詳細(xì)內(nèi)容,更多關(guān)于xxl-job濫用netty的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring如何更簡(jiǎn)單的讀取和存儲(chǔ)對(duì)象
這篇文章主要給大家介紹了關(guān)于Spring如何更簡(jiǎn)單的讀取和存儲(chǔ)對(duì)象的相關(guān)資料,在Spring 中想要更簡(jiǎn)單的存儲(chǔ)和讀取對(duì)象的核?是使?注解,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06
Hibernate迫切連接和普通連接的區(qū)別實(shí)例詳解
這篇文章主要介紹了Hibernate迫切連接和普通連接的區(qū)別實(shí)例詳解,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12
在IDEA中使用debug工具去運(yùn)行java程序的實(shí)現(xiàn)步驟
調(diào)試工具(debug工具)是一種用于幫助程序員識(shí)別和修復(fù)程序中的錯(cuò)誤的工具,它們提供了一系列的功能,幫助程序員在代碼執(zhí)行的過程中跟蹤和檢測(cè)問題,本文將給大家介紹使用debug工具去運(yùn)行java程序的實(shí)現(xiàn)步驟,需要的朋友可以參考下2024-04-04
Java HashSet集合存儲(chǔ)遍歷學(xué)生對(duì)象代碼實(shí)例
這篇文章主要介紹了Java HashSet集合存儲(chǔ)遍歷學(xué)生對(duì)象代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
idea設(shè)置在包里面在創(chuàng)建一個(gè)包方式
這篇文章主要介紹了idea設(shè)置在包里面在創(chuàng)建一個(gè)包方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
Mybatis分頁(yè)插件PageHelper配置及使用方法詳解
這篇文章主要介紹了Mybatis分頁(yè)插件PageHelper配置及使用方法詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08
Java中的Semaphore信號(hào)量簡(jiǎn)單使用代碼實(shí)例
這篇文章主要介紹了Java中的Semaphore信號(hào)量簡(jiǎn)單使用代碼實(shí)例,Semaphore是用來保護(hù)一個(gè)或者多個(gè)共享資源的訪問,Semaphore內(nèi)部維護(hù)了一個(gè)計(jì)數(shù)器,其值為可以訪問的共享資源的個(gè)數(shù),一個(gè)線程要訪問共享資源,需要的朋友可以參考下2023-12-12
springboot整合log4j的踩坑實(shí)戰(zhàn)記錄
log日志的重要性不言而喻,所以我們需要在系統(tǒng)內(nèi)根據(jù)實(shí)際的業(yè)務(wù)進(jìn)行日志的整合,下面這篇文章主要給大家介紹了關(guān)于springboot整合log4j的踩坑實(shí)戰(zhàn)記錄,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-04-04

