一文詳解基于k8s部署Session模式Flink集群
基于k8s部署Session模式Flink集群
在分布式計(jì)算領(lǐng)域中,Apache Flink是一個(gè)快速、可靠且易于使用的計(jì)算引擎。Flink集群是一個(gè)分布式系統(tǒng),它由Flink JobManager和多個(gè)Flink TaskManager組成。部署Flink集群時(shí),高可用性是非常重要的一個(gè)考慮因素。在本文中,我們將介紹如何基于kubernetes(k8s)部署高可用Session模式的Flink集群,并使用minio作為文件系統(tǒng)(filesystem)。
什么是Session模式
在Flink中,有兩種部署模式:Standalone和Session。Standalone模式下,F(xiàn)link集群是一組獨(dú)立的進(jìn)程,它們共享同一個(gè)配置文件,并通過Akka通信。Session模式下,F(xiàn)link集群是動(dòng)態(tài)的、可伸縮的,可以根據(jù)需要啟動(dòng)或停止。Session模式下,F(xiàn)link JobManager和TaskManager進(jìn)程運(yùn)行在容器中,可以通過k8s進(jìn)行動(dòng)態(tài)管理。
Session模式的優(yōu)點(diǎn)是:
- 可以根據(jù)需要啟動(dòng)或停止Flink集群
- 可以動(dòng)態(tài)添加或刪除TaskManager
- 可以使用k8s的伸縮功能自動(dòng)調(diào)整Flink集群的大小
- 可以與k8s的其他資源進(jìn)行整合,例如存儲(chǔ)卷、網(wǎng)絡(luò)策略等
因此,Session模式是在Kubernetes上部署Flink集群的首選模式。
Flink的filesystem
在 Flink 的處理過程中,數(shù)據(jù)可能會(huì)存儲(chǔ)在不同的文件系統(tǒng)中,如本地文件系統(tǒng)、HDFS、S3 等。為了統(tǒng)一處理這些文件系統(tǒng),F(xiàn)link 引入了 FileSystem 的概念,它是一個(gè)抽象的接口,提供了對(duì)不同文件系統(tǒng)的統(tǒng)一訪問方式。
fileSystem 的實(shí)現(xiàn)類可以通過 Flink 的配置文件指定。Flink 支持多種文件系統(tǒng),包括本地文件系統(tǒng)、HDFS、S3、Google Cloud Storage 等,因?yàn)閙inio實(shí)現(xiàn)了s3協(xié)議,所以也可以使用minio來作為文件系統(tǒng)。
基于k8s部署高可用Session模式Flink集群
各組件版本號(hào)
| 組件 | 版本號(hào) |
|---|---|
| kubernetes | 1.15.12 |
| flink | 1.15.3 |
制作鏡像
使用minio作為文件系統(tǒng)需要增加s3相關(guān)的依賴jar包,所以需要自己制作鏡像
Dockerfile:
FROM apache/flink:1.15.3-scala_2.12 # 需要用到的jar包 # flink-cdc ADD lib/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/flink/lib/ # jdbc連接器 ADD lib/flink-connector-jdbc-1.15.3.jar /opt/flink/lib/ # mysql驅(qū)動(dòng) ADD lib/mysql-connector-j-8.0.32.jar /opt/flink/lib/ # oracle驅(qū)動(dòng) ADD lib/ojdbc8-21.9.0.0.jar /opt/flink/lib/ # 文件系統(tǒng)插件需要放到插件目錄,按規(guī)范放置 RUN mkdir /opt/flink/plugins/s3-fs-presto && cp -f /opt/flink/opt/flink-s3-fs-presto-1.15.3.jar /opt/flink/plugins/s3-fs-presto/
構(gòu)建鏡像:
docker build -t sivdead/flink:1.15.3_scala_2.12 -f .\DockerFile .
配置文件(ConfigMap)
配置文件分兩個(gè)部分,flink-conf.yaml和log4j-console.properties
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: szyx-flink
labels:
app: flink
data:
flink-conf.yaml: |+
kubernetes.cluster-id: szyx-flink
# 所在的命名空間
kubernetes.namespace: szyx-flink
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2867m
parallelism.default: 2
execution.checkpointing.interval: 10s
# 文件系統(tǒng)
fs.default-scheme: s3
# minio地址
s3.endpoint: https://minio.k8s.io:9000
# minio的bucket
s3.flink.bucket: szyxflink
s3.access-key: <minio賬號(hào)>
s3.secret-key: <minio密碼>
# 狀態(tài)存儲(chǔ)格式
state.backend: rocksdb
s3.path.style.access: true
blob.storage.directory: /opt/flink/tmp/blob
web.upload.dir: /opt/flink/tmp/upload
io.tmp.dirs: /opt/flink/tmp
# 狀態(tài)管理
# checkpoint存儲(chǔ)地址
state.checkpoints.dir: s3://szyxflink/state/checkpoint
# savepoint存儲(chǔ)地址
state.savepoints.dir: s3://szyxflink/state/savepoint
# checkpoint間隔
execution.checkpointing.interval: 5000
execution.checkpointing.mode: EXACTLY_ONCE
# checkpoint保留數(shù)量
state.checkpoints.num-retained: 3
# history-server# 監(jiān)視以下目錄中已完成的作業(yè)
jobmanager.archive.fs.dir: s3://szyxflink/completed-jobs
# 每 10 秒刷新一次
historyserver.archive.fs.refresh-interval: 10000
historyserver.archive.fs.dir: s3://szyxflink/completed-jobs
# 高可用
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://szyxflink/ha
# 每6個(gè)小時(shí)觸發(fā)一次savepoint
kubernetes.operator.periodic.savepoint.interval: 6h
kubernetes.operator.savepoint.history.max.age: 24h
kubernetes.operator.savepoint.history.max.count: 5
# Restart of unhealthy job deployments
kubernetes.operator.cluster.health-check.enabled: true
# Restart failed job deployments
kubernetes.operator.job.restart.failed: true
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
添加serviceAccount并授權(quán)
在 Kubernetes 上部署 Flink 集群時(shí),需要?jiǎng)?chuàng)建一個(gè) serviceAccount 來授權(quán) Flink 任務(wù)在 Kubernetes 集群中執(zhí)行。ServiceAccount 是 Kubernetes 中一種資源對(duì)象,用于授權(quán) Pod 訪問 Kubernetes API。當(dāng) Flink JobManager 或 TaskManager 啟動(dòng)時(shí),需要使用這個(gè) serviceAccount 來與 Kubernetes API 交互,獲取集群資源并進(jìn)行任務(wù)的調(diào)度和執(zhí)行。
apiVersion: v1 kind: ServiceAccount metadata: name: flink-service-account namespace: szyx-flink --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: szyx-flink name: flink rules: - apiGroups: [""] resources: ["pods", "services","configmaps"] verbs: ["create", "get", "list", "watch", "delete"] - apiGroups: [""] resources: ["pods/log"] verbs: ["get"] - apiGroups: ["batch"] resources: ["jobs"] verbs: ["create", "get", "list", "watch", "delete"] - apiGroups: ["extensions"] resources: ["ingresses"] verbs: ["create", "get", "list", "watch", "delete"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: namespace: szyx-flink name: flink-role-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: flink subjects: - kind: ServiceAccount name: flink-service-account namespace: flink
部署JobManager
jobManager掛載用pvc
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flink-tmp
namespace: szyx-flink
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 40Gi
Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: szyx-flink
spec:
replicas: 1 # Set the value to greater than 1 to start standby JobManagers
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
imagePullPolicy: Always
image: sivdead/flink:1.15.3_scala_2.12
env:
# 注入POD的ip到容器內(nèi)
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# 時(shí)區(qū)
- name: TZ
value: Asia/Shanghai
# The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
args: ["jobmanager", "$(POD_IP)"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
resources:
requests:
memory: "8192Mi"
cpu: "4"
limits:
memory: "8192Mi"
cpu: "4"
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: tmp-dir
mountPath: /opt/flink/tmp
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
# 節(jié)點(diǎn)選擇器
nodeSelector:
zone: mainland
# 節(jié)點(diǎn)容忍
tolerations:
- key: zone
value: mainland
effect: NoSchedule
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
name: tmp-dir
persistentVolumeClaim:
claimName: flink-tmp
Service:
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
Ingress:
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
annotations:
# 因?yàn)橛锌赡苄枰蟼鱦ar包,所以需要設(shè)置大一些
nginx.ingress.kubernetes.io/proxy-body-size: 300m
nginx.ingress.kubernetes.io/rewrite-target: /$1
name: job-manager
namespace: szyx-flink
spec:
rules:
- host: flink.k8s.io
http:
paths:
- backend:
serviceName: flink-jobmanager
servicePort: 8081
path: /flink/(.*)
訪問http://flink.k8s.io/flink/能打開flink界面,說明部署完成

部署TaskManager
Deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: szyx-flink
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
imagePullPolicy: Always
image: sivdead/flink:1.15.3_scala_2.12
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
resources:
requests:
memory: "8192Mi"
cpu: "4"
limits:
memory: "8192Mi"
cpu: "4"
# 節(jié)點(diǎn)選擇器
nodeSelector:
zone: mainland
# 節(jié)點(diǎn)容忍
tolerations:
- key: zone
value: mainland
effect: NoSchedule
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
部署完成后,打開flink頁面,查看TaskManages:

測(cè)試提交作業(yè)
- 在頁面上提交flink自帶的示例:WordCount.jar

- 重啟jobmanager,檢查作業(yè)jar包是否依然存在
運(yùn)行作業(yè)

檢查運(yùn)行結(jié)果


以上就是一文詳解基于k8s部署Session模式Flink集群的詳細(xì)內(nèi)容,更多關(guān)于k8s部署Session模式Flink集群的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
在AWS-EC2中安裝Minikube集群的詳細(xì)過程
這篇文章主要介紹了在AWS-EC2中安裝Minikube集群,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-06-06
K8S之StatefulSet有狀態(tài)服務(wù)詳解
本文主要介紹了K8S之StatefulSet有狀態(tài)服務(wù)詳解,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
理解k8s控制器DaemonSet創(chuàng)建及使用場(chǎng)景
這篇文章主要為大家介紹了k8s控制器DaemonSet創(chuàng)建及使用場(chǎng)景詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Rainbond配置組件自動(dòng)構(gòu)建部署官方文檔講解
這篇文章主要為大家介紹了Rainbond配置組件自動(dòng)構(gòu)建部署官方文檔講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
Centos?8.2?升級(jí)內(nèi)核通過elrepo源的方法
這篇文章主要介紹了Centos?8.2?升級(jí)內(nèi)核通過elrepo源,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-10-10

