java開源調度如何給xxljob加k8s執(zhí)行器
前言
xxljob 是采用 java 開發(fā)的開源的任務調度系統(tǒng),架構上分為調度管理器、執(zhí)行器,目前除了官方提供的 java 執(zhí)行器外,也有 go 開發(fā)者提供了 go 語言的執(zhí)行器(看了 go 執(zhí)行器的代碼,除了任務日志沒有實現(xiàn),其他功能實現(xiàn)都比較完整)。 xxljob 在設計上,抽象出了執(zhí)行器的接口,所以實現(xiàn)一個語言的執(zhí)行器并不復雜,這里主要探索下,如何利用 k8s 的 pod 的能力,使用 xxljob 調度 pod 運行,實現(xiàn)一個通用的和語言無關的執(zhí)行器
- xxljob :https://github.com/xuxueli/xxl-job
- k8s-client-java: https://github.com/fabric8io/kubernetes-client
執(zhí)行器接口
實現(xiàn)一個 xxljob 的執(zhí)行器,如果不考慮執(zhí)行器節(jié)點自動注冊,只需要實現(xiàn)如下五個接口即可:
- /beat :執(zhí)行器心跳
- /idleBeat :執(zhí)行器的某個 job 是否空閑
- /run :觸發(fā) job 執(zhí)行
- /kill :終止正在執(zhí)行的 job
- /log :查看本節(jié)點執(zhí)行器的 job 執(zhí)行日志
不過一些調度策略則需要每個執(zhí)行器自行實現(xiàn)了,比如【阻塞處理策略】,當同一個job 的任務還在執(zhí)行,突然又收到了一個新的,是串行執(zhí)行,還是停止之前的任務,或者丟棄當前的任務,這些實現(xiàn)都需要執(zhí)行器考慮。
K8S 執(zhí)行器設計
上面已經(jīng)了解了實現(xiàn)一個執(zhí)行器的要素。但是讓 k8s 實現(xiàn)這些接口,難度有點高。然后又希望不破壞現(xiàn)有的 xxljob 的設計,怎么辦?代理解決??梢灾苯硬捎矛F(xiàn)有的 java 執(zhí)行器,創(chuàng)建一個 job 任務,這個 job 任務專門發(fā)起 k8s 的調度,具體的調度 pod 信息通過調度參數(shù)傳遞,下面來實現(xiàn)下,以及看下需要注意的問題。
1、在 XXL-JOB-ADMIN 模塊新增執(zhí)行器
為了盡量減少系統(tǒng)維護的復雜度,我們可以將代理調度 k8s 的執(zhí)行器,直接集成到 admin 模塊,啟動 admin 的時候,自動注冊 k8s 執(zhí)行器。

2、引入 K8S-CLIENT-JAVA ,使用 SERVICE ACCOUNT 機制與 K8S 交互
<dependency> <groupId>io.fabric8</groupId> <artifactId>kubernetes-client</artifactId> <version>5.4.0</version> </dependency>
這個客戶端提供了完整的和 k8s-api-server 交互能力,使用這個客戶端,基于 k8s 的 service account 認證,可以輕松在 xxljob 所在 namespace 內完成 pod 的生命周期管理。引入依賴后,首先創(chuàng)建 client 實例:
/**
* @author kl (http://kailing.pub)
* @since 2021/6/4
*/
@Configuration
public class KubernetesClientConfig {
@Bean
public KubernetesClient kubernetesClient(){
return new DefaultKubernetesClient(Config.autoConfigure(null));
}
}這里初始化客戶端時,采用了自動發(fā)現(xiàn)配置的模式,如果是本機開發(fā)時,就會自動尋找你本機的 kubectl 配置,當 xxljob 部署到 k8s 內時,如果找不到本地的就會嘗試尋找 service account 創(chuàng)建出來的配置,然后從環(huán)境變量中自發(fā)現(xiàn) k8s 集群的鏈接地址。所以無論是開發(fā)環(huán)境還是線上環(huán)境,都不用配置k8s 的鏈接認證信息。但是,部署到 k8s 時,因為需要借助 k8s 的 service account 機制與 k8s 交互,需要多定義一個 service account 的權限聲明,可參考如下:
# In GKE need to get RBAC permissions first with # kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin [--user=|--group=] --- apiVersion: v1 kind: ServiceAccount metadata: name: xxljob --- kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: xxljob rules: - apiGroups: [""] resources: ["pods"] verbs: ["create","delete","get","list","patch","update","watch"] - apiGroups: [""] resources: ["pods/exec"] verbs: ["create","delete","get","list","patch","update","watch"] - apiGroups: [""] resources: ["pods/log"] verbs: ["get","list","watch"] - apiGroups: [""] resources: ["events"] verbs: ["watch"] - apiGroups: [""] resources: ["secrets"] verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: xxljob roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: xxljob subjects: - kind: ServiceAccount name: xxljob
3、編寫代理執(zhí)行器調度代碼
/**
* @author kl (http://kailing.pub)
* @since 2021/5/28
*/
@Component
public class KubernetesExecutorHandler {
private static final Logger logger = LoggerFactory.getLogger(KubernetesExecutorHandler.class);
private static final String NAMESPACE = "xxl-job";
private final KubernetesClient client;
public KubernetesExecutorHandler(KubernetesClient client) {
this.client = client;
}
@XxlJob(value = "callK8s")
public void callK8s() throws InterruptedException {
String podResource = XxlJobHelper.getJobParam();
Pod pod = Serialization.unmarshal(podResource, Pod.class);
pod.getSpec().setRestartPolicy("Never");//這里強制設置重啟策略為不重啟
pod = client.pods().inNamespace(NAMESPACE).create(pod);
client.resource(pod).waitUntilCondition(pod1 -> pod1.getStatus().getPhase().equals("Succeeded") || pod1.getStatus().getPhase().equals("Failed"), 2, TimeUnit.MINUTES);
String log = client.pods().inNamespace(NAMESPACE).withName(pod.getMetadata().getName()).getLog();
XxlJobHelper.log(log); //記錄 pod 日志到 xxl-job
logger.info(log);
client.resource(pod).delete();
}
}如上,一個簡版的 k8s 執(zhí)行器便完成了,使用時,通過定義bean模式的 job ,然后選擇 k8s 執(zhí)行器,jobHandler 名稱和填上 callk8s,通過job 參數(shù)傳遞 pod 調度信息,如:

這里定義了一個打印 當前時間和當前環(huán)境變量的 pod 任務,執(zhí)行完成后,就可以從 job 的日志里看到執(zhí)行結果了,如:

結語
目前的實現(xiàn)方式,單純從兼容 xxljob 現(xiàn)有的架構模式,以及現(xiàn)有的實現(xiàn)出發(fā)的,所以采用了 java 代理執(zhí)行器代為調度 pod 的方案,基本繼承了所有 java 執(zhí)行器的功能,比如 job 執(zhí)行日志記錄,并發(fā)執(zhí)行策略等。需要注意的是,因為是單 handler 實現(xiàn),每個job 都會用同一個 handler 去運行,所以創(chuàng)建任務的時候并發(fā)策略這塊只能選擇單機串行執(zhí)行,否則非常容易丟任務。另一個需要考慮的問題,如果代理執(zhí)行器非正常關閉,pod 沒來的及刪除就掛了,這個時候需要啟動一個巡檢的線程,檢測已經(jīng)完成或者已經(jīng)出錯的 pod ,然后清理掉。
以上就是java開源調度如何給xxljob加k8s執(zhí)行器的詳細內容,更多關于java開源xxljob加k8s執(zhí)行器的資料請關注腳本之家其它相關文章!
相關文章
mybatis實現(xiàn)遍歷Map的key和value
這篇文章主要介紹了mybatis實現(xiàn)遍歷Map的key和value方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
SpringBoot整合SpringSecurityOauth2實現(xiàn)鑒權動態(tài)權限問題
這篇文章主要介紹了SpringBoot整合SpringSecurityOauth2實現(xiàn)鑒權-動態(tài)權限,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-06-06
Mybatis分頁插件PageHelper配置及使用方法詳解
這篇文章主要介紹了Mybatis分頁插件PageHelper配置及使用方法詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-08-08

