java開源調(diào)度如何給xxljob加k8s執(zhí)行器
前言
xxljob 是采用 java 開發(fā)的開源的任務(wù)調(diào)度系統(tǒng),架構(gòu)上分為調(diào)度管理器、執(zhí)行器,目前除了官方提供的 java 執(zhí)行器外,也有 go 開發(fā)者提供了 go 語言的執(zhí)行器(看了 go 執(zhí)行器的代碼,除了任務(wù)日志沒有實(shí)現(xiàn),其他功能實(shí)現(xiàn)都比較完整)。 xxljob 在設(shè)計(jì)上,抽象出了執(zhí)行器的接口,所以實(shí)現(xiàn)一個(gè)語言的執(zhí)行器并不復(fù)雜,這里主要探索下,如何利用 k8s 的 pod 的能力,使用 xxljob 調(diào)度 pod 運(yùn)行,實(shí)現(xiàn)一個(gè)通用的和語言無關(guān)的執(zhí)行器
- xxljob :https://github.com/xuxueli/xxl-job
- k8s-client-java: https://github.com/fabric8io/kubernetes-client
執(zhí)行器接口
實(shí)現(xiàn)一個(gè) xxljob 的執(zhí)行器,如果不考慮執(zhí)行器節(jié)點(diǎn)自動注冊,只需要實(shí)現(xiàn)如下五個(gè)接口即可:
- /beat :執(zhí)行器心跳
- /idleBeat :執(zhí)行器的某個(gè) job 是否空閑
- /run :觸發(fā) job 執(zhí)行
- /kill :終止正在執(zhí)行的 job
- /log :查看本節(jié)點(diǎn)執(zhí)行器的 job 執(zhí)行日志
不過一些調(diào)度策略則需要每個(gè)執(zhí)行器自行實(shí)現(xiàn)了,比如【阻塞處理策略】,當(dāng)同一個(gè)job 的任務(wù)還在執(zhí)行,突然又收到了一個(gè)新的,是串行執(zhí)行,還是停止之前的任務(wù),或者丟棄當(dāng)前的任務(wù),這些實(shí)現(xiàn)都需要執(zhí)行器考慮。
K8S 執(zhí)行器設(shè)計(jì)
上面已經(jīng)了解了實(shí)現(xiàn)一個(gè)執(zhí)行器的要素。但是讓 k8s 實(shí)現(xiàn)這些接口,難度有點(diǎn)高。然后又希望不破壞現(xiàn)有的 xxljob 的設(shè)計(jì),怎么辦?代理解決。可以直接采用現(xiàn)有的 java 執(zhí)行器,創(chuàng)建一個(gè) job 任務(wù),這個(gè) job 任務(wù)專門發(fā)起 k8s 的調(diào)度,具體的調(diào)度 pod 信息通過調(diào)度參數(shù)傳遞,下面來實(shí)現(xiàn)下,以及看下需要注意的問題。
1、在 XXL-JOB-ADMIN 模塊新增執(zhí)行器
為了盡量減少系統(tǒng)維護(hù)的復(fù)雜度,我們可以將代理調(diào)度 k8s 的執(zhí)行器,直接集成到 admin 模塊,啟動 admin 的時(shí)候,自動注冊 k8s 執(zhí)行器。
2、引入 K8S-CLIENT-JAVA ,使用 SERVICE ACCOUNT 機(jī)制與 K8S 交互
<dependency> <groupId>io.fabric8</groupId> <artifactId>kubernetes-client</artifactId> <version>5.4.0</version> </dependency>
這個(gè)客戶端提供了完整的和 k8s-api-server 交互能力,使用這個(gè)客戶端,基于 k8s 的 service account 認(rèn)證,可以輕松在 xxljob 所在 namespace 內(nèi)完成 pod 的生命周期管理。引入依賴后,首先創(chuàng)建 client 實(shí)例:
/** * @author kl (http://kailing.pub) * @since 2021/6/4 */ @Configuration public class KubernetesClientConfig { @Bean public KubernetesClient kubernetesClient(){ return new DefaultKubernetesClient(Config.autoConfigure(null)); } }
這里初始化客戶端時(shí),采用了自動發(fā)現(xiàn)配置的模式,如果是本機(jī)開發(fā)時(shí),就會自動尋找你本機(jī)的 kubectl 配置,當(dāng) xxljob 部署到 k8s 內(nèi)時(shí),如果找不到本地的就會嘗試尋找 service account 創(chuàng)建出來的配置,然后從環(huán)境變量中自發(fā)現(xiàn) k8s 集群的鏈接地址。所以無論是開發(fā)環(huán)境還是線上環(huán)境,都不用配置k8s 的鏈接認(rèn)證信息。但是,部署到 k8s 時(shí),因?yàn)樾枰柚?k8s 的 service account 機(jī)制與 k8s 交互,需要多定義一個(gè) service account 的權(quán)限聲明,可參考如下:
# In GKE need to get RBAC permissions first with # kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin [--user=|--group=] --- apiVersion: v1 kind: ServiceAccount metadata: name: xxljob --- kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: xxljob rules: - apiGroups: [""] resources: ["pods"] verbs: ["create","delete","get","list","patch","update","watch"] - apiGroups: [""] resources: ["pods/exec"] verbs: ["create","delete","get","list","patch","update","watch"] - apiGroups: [""] resources: ["pods/log"] verbs: ["get","list","watch"] - apiGroups: [""] resources: ["events"] verbs: ["watch"] - apiGroups: [""] resources: ["secrets"] verbs: ["get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: xxljob roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: xxljob subjects: - kind: ServiceAccount name: xxljob
3、編寫代理執(zhí)行器調(diào)度代碼
/** * @author kl (http://kailing.pub) * @since 2021/5/28 */ @Component public class KubernetesExecutorHandler { private static final Logger logger = LoggerFactory.getLogger(KubernetesExecutorHandler.class); private static final String NAMESPACE = "xxl-job"; private final KubernetesClient client; public KubernetesExecutorHandler(KubernetesClient client) { this.client = client; } @XxlJob(value = "callK8s") public void callK8s() throws InterruptedException { String podResource = XxlJobHelper.getJobParam(); Pod pod = Serialization.unmarshal(podResource, Pod.class); pod.getSpec().setRestartPolicy("Never");//這里強(qiáng)制設(shè)置重啟策略為不重啟 pod = client.pods().inNamespace(NAMESPACE).create(pod); client.resource(pod).waitUntilCondition(pod1 -> pod1.getStatus().getPhase().equals("Succeeded") || pod1.getStatus().getPhase().equals("Failed"), 2, TimeUnit.MINUTES); String log = client.pods().inNamespace(NAMESPACE).withName(pod.getMetadata().getName()).getLog(); XxlJobHelper.log(log); //記錄 pod 日志到 xxl-job logger.info(log); client.resource(pod).delete(); } }
如上,一個(gè)簡版的 k8s 執(zhí)行器便完成了,使用時(shí),通過定義bean模式的 job ,然后選擇 k8s 執(zhí)行器,jobHandler 名稱和填上 callk8s,通過job 參數(shù)傳遞 pod 調(diào)度信息,如:
這里定義了一個(gè)打印 當(dāng)前時(shí)間和當(dāng)前環(huán)境變量的 pod 任務(wù),執(zhí)行完成后,就可以從 job 的日志里看到執(zhí)行結(jié)果了,如:
結(jié)語
目前的實(shí)現(xiàn)方式,單純從兼容 xxljob 現(xiàn)有的架構(gòu)模式,以及現(xiàn)有的實(shí)現(xiàn)出發(fā)的,所以采用了 java 代理執(zhí)行器代為調(diào)度 pod 的方案,基本繼承了所有 java 執(zhí)行器的功能,比如 job 執(zhí)行日志記錄,并發(fā)執(zhí)行策略等。需要注意的是,因?yàn)槭菃?handler 實(shí)現(xiàn),每個(gè)job 都會用同一個(gè) handler 去運(yùn)行,所以創(chuàng)建任務(wù)的時(shí)候并發(fā)策略這塊只能選擇單機(jī)串行執(zhí)行,否則非常容易丟任務(wù)。另一個(gè)需要考慮的問題,如果代理執(zhí)行器非正常關(guān)閉,pod 沒來的及刪除就掛了,這個(gè)時(shí)候需要啟動一個(gè)巡檢的線程,檢測已經(jīng)完成或者已經(jīng)出錯(cuò)的 pod ,然后清理掉。
以上就是java開源調(diào)度如何給xxljob加k8s執(zhí)行器的詳細(xì)內(nèi)容,更多關(guān)于java開源xxljob加k8s執(zhí)行器的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
mybatis實(shí)現(xiàn)遍歷Map的key和value
這篇文章主要介紹了mybatis實(shí)現(xiàn)遍歷Map的key和value方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01Java工程mybatis實(shí)現(xiàn)多表查詢過程詳解
這篇文章主要介紹了Java工程mybatis實(shí)現(xiàn)多表查詢過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06SpringBoot整合SpringSecurityOauth2實(shí)現(xiàn)鑒權(quán)動態(tài)權(quán)限問題
這篇文章主要介紹了SpringBoot整合SpringSecurityOauth2實(shí)現(xiàn)鑒權(quán)-動態(tài)權(quán)限,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06Mybatis分頁插件PageHelper配置及使用方法詳解
這篇文章主要介紹了Mybatis分頁插件PageHelper配置及使用方法詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08