欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文詳解基于k8s部署Session模式Flink集群

 更新時(shí)間:2023年03月15日 14:40:46   作者:sivdead  
這篇文章主要為大家介紹了基于k8s部署Session模式Flink集群詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

基于k8s部署Session模式Flink集群

在分布式計(jì)算領(lǐng)域中,Apache Flink是一個(gè)快速、可靠且易于使用的計(jì)算引擎。Flink集群是一個(gè)分布式系統(tǒng),它由Flink JobManager和多個(gè)Flink TaskManager組成。部署Flink集群時(shí),高可用性是非常重要的一個(gè)考慮因素。在本文中,我們將介紹如何基于kubernetes(k8s)部署高可用Session模式的Flink集群,并使用minio作為文件系統(tǒng)(filesystem)。

什么是Session模式

在Flink中,有兩種部署模式:Standalone和Session。Standalone模式下,F(xiàn)link集群是一組獨(dú)立的進(jìn)程,它們共享同一個(gè)配置文件,并通過(guò)Akka通信。Session模式下,F(xiàn)link集群是動(dòng)態(tài)的、可伸縮的,可以根據(jù)需要啟動(dòng)或停止。Session模式下,F(xiàn)link JobManager和TaskManager進(jìn)程運(yùn)行在容器中,可以通過(guò)k8s進(jìn)行動(dòng)態(tài)管理。

Session模式的優(yōu)點(diǎn)是:

  • 可以根據(jù)需要啟動(dòng)或停止Flink集群
  • 可以動(dòng)態(tài)添加或刪除TaskManager
  • 可以使用k8s的伸縮功能自動(dòng)調(diào)整Flink集群的大小
  • 可以與k8s的其他資源進(jìn)行整合,例如存儲(chǔ)卷、網(wǎng)絡(luò)策略等

因此,Session模式是在Kubernetes上部署Flink集群的首選模式。

Flink的filesystem

在 Flink 的處理過(guò)程中,數(shù)據(jù)可能會(huì)存儲(chǔ)在不同的文件系統(tǒng)中,如本地文件系統(tǒng)、HDFS、S3 等。為了統(tǒng)一處理這些文件系統(tǒng),F(xiàn)link 引入了 FileSystem 的概念,它是一個(gè)抽象的接口,提供了對(duì)不同文件系統(tǒng)的統(tǒng)一訪問(wèn)方式。

fileSystem 的實(shí)現(xiàn)類可以通過(guò) Flink 的配置文件指定。Flink 支持多種文件系統(tǒng),包括本地文件系統(tǒng)、HDFS、S3、Google Cloud Storage 等,因?yàn)閙inio實(shí)現(xiàn)了s3協(xié)議,所以也可以使用minio來(lái)作為文件系統(tǒng)。

基于k8s部署高可用Session模式Flink集群

各組件版本號(hào)

組件版本號(hào)
kubernetes1.15.12
flink1.15.3

制作鏡像

使用minio作為文件系統(tǒng)需要增加s3相關(guān)的依賴jar包,所以需要自己制作鏡像

Dockerfile:

FROM apache/flink:1.15.3-scala_2.12
# 需要用到的jar包
# flink-cdc
ADD lib/flink-sql-connector-mysql-cdc-2.3.0.jar /opt/flink/lib/
# jdbc連接器
ADD lib/flink-connector-jdbc-1.15.3.jar /opt/flink/lib/
# mysql驅(qū)動(dòng)
ADD lib/mysql-connector-j-8.0.32.jar /opt/flink/lib/
# oracle驅(qū)動(dòng)
ADD lib/ojdbc8-21.9.0.0.jar /opt/flink/lib/
# 文件系統(tǒng)插件需要放到插件目錄,按規(guī)范放置
RUN mkdir /opt/flink/plugins/s3-fs-presto && cp -f /opt/flink/opt/flink-s3-fs-presto-1.15.3.jar /opt/flink/plugins/s3-fs-presto/

構(gòu)建鏡像:

docker build -t sivdead/flink:1.15.3_scala_2.12 -f .\DockerFile .

配置文件(ConfigMap)

配置文件分兩個(gè)部分,flink-conf.yamllog4j-console.properties

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: szyx-flink
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    kubernetes.cluster-id: szyx-flink
    # 所在的命名空間
    kubernetes.namespace: szyx-flink
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 2867m
    parallelism.default: 2
    execution.checkpointing.interval: 10s    
    # 文件系統(tǒng)
    fs.default-scheme: s3
    # minio地址
    s3.endpoint: https://minio.k8s.io:9000
    # minio的bucket
    s3.flink.bucket: szyxflink
    s3.access-key: <minio賬號(hào)>
    s3.secret-key: <minio密碼>
    # 狀態(tài)存儲(chǔ)格式
    state.backend: rocksdb
    s3.path.style.access: true
    blob.storage.directory: /opt/flink/tmp/blob
    web.upload.dir: /opt/flink/tmp/upload
    io.tmp.dirs: /opt/flink/tmp
    # 狀態(tài)管理
    # checkpoint存儲(chǔ)地址
    state.checkpoints.dir: s3://szyxflink/state/checkpoint
    # savepoint存儲(chǔ)地址
    state.savepoints.dir: s3://szyxflink/state/savepoint
    # checkpoint間隔
    execution.checkpointing.interval: 5000
    execution.checkpointing.mode: EXACTLY_ONCE
    # checkpoint保留數(shù)量
    state.checkpoints.num-retained: 3
    # history-server# 監(jiān)視以下目錄中已完成的作業(yè)
    jobmanager.archive.fs.dir: s3://szyxflink/completed-jobs
    # 每 10 秒刷新一次
    historyserver.archive.fs.refresh-interval: 10000
    historyserver.archive.fs.dir: s3://szyxflink/completed-jobs
    # 高可用
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://szyxflink/ha
    # 每6個(gè)小時(shí)觸發(fā)一次savepoint
    kubernetes.operator.periodic.savepoint.interval: 6h
    kubernetes.operator.savepoint.history.max.age: 24h
    kubernetes.operator.savepoint.history.max.count: 5
    # Restart of unhealthy job deployments
    kubernetes.operator.cluster.health-check.enabled: true
    # Restart failed job deployments 
    kubernetes.operator.job.restart.failed: true
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender
    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO
    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO
    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10
    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

添加serviceAccount并授權(quán)

在 Kubernetes 上部署 Flink 集群時(shí),需要?jiǎng)?chuàng)建一個(gè) serviceAccount 來(lái)授權(quán) Flink 任務(wù)在 Kubernetes 集群中執(zhí)行。ServiceAccount 是 Kubernetes 中一種資源對(duì)象,用于授權(quán) Pod 訪問(wèn) Kubernetes API。當(dāng) Flink JobManager 或 TaskManager 啟動(dòng)時(shí),需要使用這個(gè) serviceAccount 來(lái)與 Kubernetes API 交互,獲取集群資源并進(jìn)行任務(wù)的調(diào)度和執(zhí)行。

apiVersion: v1
kind: ServiceAccount
metadata:
  name: flink-service-account
  namespace: szyx-flink
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: szyx-flink
  name: flink
rules:
- apiGroups: [""]
  resources: ["pods", "services","configmaps"]
  verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: [""]
  resources: ["pods/log"]
  verbs: ["get"]
- apiGroups: ["batch"]
  resources: ["jobs"]
  verbs: ["create", "get", "list", "watch", "delete"]
- apiGroups: ["extensions"]
  resources: ["ingresses"]
  verbs: ["create", "get", "list", "watch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  namespace: szyx-flink
  name: flink-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: flink
subjects:
- kind: ServiceAccount
  name: flink-service-account
  namespace: flink

部署JobManager

jobManager掛載用pvc

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flink-tmp
  namespace: szyx-flink
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 40Gi

Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: szyx-flink
spec:
  replicas: 1 # Set the value to greater than 1 to start standby JobManagers
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        imagePullPolicy: Always
        image: sivdead/flink:1.15.3_scala_2.12
        env:
        # 注入POD的ip到容器內(nèi)
        - name: POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        # 時(shí)區(qū)
        - name: TZ
          value: Asia/Shanghai
        # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP.
        args: ["jobmanager", "$(POD_IP)"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        resources:
          requests:
            memory: "8192Mi"
            cpu: "4"
          limits:
            memory: "8192Mi"
            cpu: "4"
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: tmp-dir
          mountPath: /opt/flink/tmp
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      serviceAccountName: flink-service-account # Service account which has the permissions to create, edit, delete ConfigMaps
      # 節(jié)點(diǎn)選擇器
      nodeSelector:
        zone: mainland
      # 節(jié)點(diǎn)容忍
      tolerations:
        - key: zone
          value: mainland
          effect: NoSchedule
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
        name: tmp-dir
        persistentVolumeClaim:
          claimName: flink-tmp

Service:

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Ingress:

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    # 因?yàn)橛锌赡苄枰蟼鱦ar包,所以需要設(shè)置大一些
    nginx.ingress.kubernetes.io/proxy-body-size: 300m
    nginx.ingress.kubernetes.io/rewrite-target: /$1
  name: job-manager
  namespace: szyx-flink
spec:
  rules:
  - host: flink.k8s.io
    http:
      paths:
      - backend:
          serviceName: flink-jobmanager
          servicePort: 8081
        path: /flink/(.*)

訪問(wèn)http://flink.k8s.io/flink/能打開flink界面,說(shuō)明部署完成

部署TaskManager

Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: szyx-flink
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        imagePullPolicy: Always
        image: sivdead/flink:1.15.3_scala_2.12
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
        resources:
          requests:
            memory: "8192Mi"
            cpu: "4"
          limits:
            memory: "8192Mi"
            cpu: "4"
            # 節(jié)點(diǎn)選擇器
      nodeSelector:
        zone: mainland
      # 節(jié)點(diǎn)容忍
      tolerations:
        - key: zone
          value: mainland
          effect: NoSchedule
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

部署完成后,打開flink頁(yè)面,查看TaskManages:

測(cè)試提交作業(yè)

  • 在頁(yè)面上提交flink自帶的示例:WordCount.jar

  • 重啟jobmanager,檢查作業(yè)jar包是否依然存在

運(yùn)行作業(yè)

檢查運(yùn)行結(jié)果

以上就是一文詳解基于k8s部署Session模式Flink集群的詳細(xì)內(nèi)容,更多關(guān)于k8s部署Session模式Flink集群的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 如何給k8s集群里的資源打標(biāo)簽

    如何給k8s集群里的資源打標(biāo)簽

    k8s集群,節(jié)點(diǎn)如果有多個(gè)角色,需要標(biāo)記出來(lái),可以給對(duì)應(yīng)的節(jié)點(diǎn)打上標(biāo)簽,方便后續(xù)了解節(jié)點(diǎn)的功能,這篇文章主要介紹了如何給k8s集群里的資源打標(biāo)簽,需要的朋友可以參考下
    2023-02-02
  • Kubernetes集群環(huán)境初始化

    Kubernetes集群環(huán)境初始化

    這篇文章介紹了Kubernetes集群環(huán)境初始化的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • K8S中五種控制器的介紹以及使用

    K8S中五種控制器的介紹以及使用

    這篇文章主要給大家介紹了關(guān)于K8S中五種控制器及使用的相關(guān)資料,控制器 又稱之為工作負(fù)載,本文通過(guò)圖文以及實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2021-12-12
  • 在AWS-EC2中安裝Minikube集群的詳細(xì)過(guò)程

    在AWS-EC2中安裝Minikube集群的詳細(xì)過(guò)程

    這篇文章主要介紹了在AWS-EC2中安裝Minikube集群,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-06-06
  • Spark三種屬性配置方式詳解

    Spark三種屬性配置方式詳解

    有時(shí)間還是多學(xué)習(xí)知識(shí)比較好,這篇文章主要介紹了Spark三種屬性配置方式詳解,具有一定參考價(jià)值,需要的朋友可以了解下。
    2017-10-10
  • K8S之StatefulSet有狀態(tài)服務(wù)詳解

    K8S之StatefulSet有狀態(tài)服務(wù)詳解

    本文主要介紹了K8S之StatefulSet有狀態(tài)服務(wù)詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • 理解k8s控制器DaemonSet創(chuàng)建及使用場(chǎng)景

    理解k8s控制器DaemonSet創(chuàng)建及使用場(chǎng)景

    這篇文章主要為大家介紹了k8s控制器DaemonSet創(chuàng)建及使用場(chǎng)景詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • Rainbond配置組件自動(dòng)構(gòu)建部署官方文檔講解

    Rainbond配置組件自動(dòng)構(gòu)建部署官方文檔講解

    這篇文章主要為大家介紹了Rainbond配置組件自動(dòng)構(gòu)建部署官方文檔講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-04-04
  • k8s解析kubeconfig的兩種常用方式最新推薦

    k8s解析kubeconfig的兩種常用方式最新推薦

    這篇文章主要介紹了k8s解析kubeconfig的兩種常用方式最新推薦,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2023-11-11
  • Centos?8.2?升級(jí)內(nèi)核通過(guò)elrepo源的方法

    Centos?8.2?升級(jí)內(nèi)核通過(guò)elrepo源的方法

    這篇文章主要介紹了Centos?8.2?升級(jí)內(nèi)核通過(guò)elrepo源,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-10-10

最新評(píng)論