欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kubernetes?controller?manager運(yùn)行機(jī)制源碼解析

 更新時(shí)間:2022年11月25日 14:22:29   作者:LuckyLove  
這篇文章主要為大家介紹了Kubernetes?controller?manager運(yùn)行機(jī)制源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

Run

確立目標(biāo)

理解 kube-controller-manager 的運(yùn)行機(jī)制

從主函數(shù)找到run函數(shù),代碼較長(zhǎng),這里精簡(jiǎn)了一下

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
	// configz 模塊,在kube-scheduler分析中已經(jīng)了解
	if cfgz, err := configz.New(ConfigzName); err == nil {
		cfgz.Set(c.ComponentConfig)
	} else {
		klog.Errorf("unable to register configz: %v", err)
	}
	// 健康監(jiān)測(cè)與http服務(wù),跳過(guò)
	var checks []healthz.HealthChecker
	var unsecuredMux *mux.PathRecorderMux
	run := func(ctx context.Context) {
		rootClientBuilder := controller.SimpleControllerClientBuilder{
			ClientConfig: c.Kubeconfig,
		}
    // client認(rèn)證相關(guān)
		var clientBuilder controller.ControllerClientBuilder
    // 創(chuàng)建controller的上下文context
		controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
		if err != nil {
			klog.Fatalf("error building controller context: %v", err)
		}
		saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
		if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
			klog.Fatalf("error starting controllers: %v", err)
		}
    // 這里的 InformerFactory 和我們?cè)趉ube-scheduler中看的 SharedInformerFactory 基本一致
		controllerContext.InformerFactory.Start(controllerContext.Stop)
		controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
		close(controllerContext.InformersStarted)
		select {}
	}
  // 是否進(jìn)行選舉
	if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
		run(context.TODO())
		panic("unreachable")
	}
  // 拼接出一個(gè)全局唯一的id
	id, err := os.Hostname()
	if err != nil {
		return err
	}
	id = id + "_" + string(uuid.NewUUID())
	rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
		c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
		c.ComponentConfig.Generic.LeaderElection.ResourceName,
		c.LeaderElectionClient.CoreV1(),
		c.LeaderElectionClient.CoordinationV1(),
		resourcelock.ResourceLockConfig{
			Identity:      id,
			EventRecorder: c.EventRecorder,
		})
	if err != nil {
		klog.Fatalf("error creating lock: %v", err)
	}
  // 正常情況下都是阻塞在RunOrDie這個(gè)函數(shù)中,不停地進(jìn)行選舉相關(guān)的工作
	leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
		Lock:          rl,
		LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
		RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
		RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
		Callbacks: leaderelection.LeaderCallbacks{
      // 開(kāi)始成為L(zhǎng)eader的時(shí)候,調(diào)用run函數(shù)
			OnStartedLeading: run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		},
		WatchDog: electionChecker,
		Name:     "kube-controller-manager",
	})
	panic("unreachable")
}

StartControllers

func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
	// 關(guān)鍵性的循環(huán),啟動(dòng)每個(gè)controllers,key為控制器名字,value為初始化函數(shù)
	for controllerName, initFn := range controllers {
    // 是否允許啟動(dòng)
		if !ctx.IsControllerEnabled(controllerName) {
			klog.Warningf("%q is disabled", controllerName)
			continue
		}
		time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
		klog.V(1).Infof("Starting %q", controllerName)
    // 調(diào)用init函數(shù)進(jìn)行啟動(dòng)
		debugHandler, started, err := initFn(ctx)
		if err != nil {
			klog.Errorf("Error starting %q", controllerName)
			return err
		}
		if !started {
			klog.Warningf("Skipping %q", controllerName)
			continue
		}
    // 注冊(cè)對(duì)應(yīng)controller到debug的url中
		if debugHandler != nil && unsecuredMux != nil {
			basePath := "/debug/controllers/" + controllerName
			unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
			unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
		}
		klog.Infof("Started %q", controllerName)
	}
	return nil
}
// 我們?cè)偃魅隿ontroller的函數(shù)去看看,對(duì)應(yīng)的controller有哪些,這里有我們很多常見(jiàn)的概念,不一一細(xì)講
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["endpointslice"] = startEndpointSliceController
	controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefulset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["csrsigning"] = startCSRSigningController
	controllers["csrapproving"] = startCSRApprovingController
	controllers["csrcleaner"] = startCSRCleanerController
	controllers["ttl"] = startTTLController
	controllers["bootstrapsigner"] = startBootstrapSignerController
	controllers["tokencleaner"] = startTokenCleanerController
	controllers["nodeipam"] = startNodeIpamController
	controllers["nodelifecycle"] = startNodeLifecycleController
	if loopMode == IncludeCloudLoops {
		controllers["service"] = startServiceController
		controllers["route"] = startRouteController
		controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
	}
	controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
	controllers["attachdetach"] = startAttachDetachController
	controllers["persistentvolume-expander"] = startVolumeExpandController
	controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
	controllers["pvc-protection"] = startPVCProtectionController
	controllers["pv-protection"] = startPVProtectionController
	controllers["ttl-after-finished"] = startTTLAfterFinishedController
	controllers["root-ca-cert-publisher"] = startRootCACertPublisher
	controllers["ephemeral-volume"] = startEphemeralVolumeController
	return controllers
}

ReplicaSet

由于我們的示例是創(chuàng)建一個(gè)nginx的pod,涉及到kube-controller-manager的內(nèi)容很少。

但是,為了加深大家對(duì) kube-controller-manager 的認(rèn)識(shí),我們引入一個(gè)新的概念 - ReplicaSet,下面是官方說(shuō)明:

A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.

ReplicaSet 的目的是維護(hù)一組在任何時(shí)候都處于運(yùn)行狀態(tài)的 Pod 副本的穩(wěn)定集合。 因此,它通常用來(lái)保證給定數(shù)量的、完全相同的 Pod 的可用性。

簡(jiǎn)單來(lái)說(shuō),ReplicaSet 就是用來(lái)生成指定個(gè)數(shù)的Pod

代碼在pkg/controller/replica_set.go

ReplicaSetController

func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
		return nil, false, nil
	}
  // 用goroutine異步運(yùn)行,包含了 ReplicaSet和Pod 的兩個(gè)Informer
  // 這一點(diǎn)很好理解:我們是要控制ReplicaSet聲明的數(shù)量和運(yùn)行的Pod數(shù)量一致,需要同時(shí)觀察者兩種資源
	go replicaset.NewReplicaSetController(
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
		replicaset.BurstReplicas,
	).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
	return nil, true, nil
}
// 運(yùn)行函數(shù)
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	defer rsc.queue.ShutDown()
	controllerName := strings.ToLower(rsc.Kind)
	klog.Infof("Starting %v controller", controllerName)
	defer klog.Infof("Shutting down %v controller", controllerName)
	if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
		return
	}
	for i := 0; i < workers; i++ {
    // 工作的函數(shù)
		go wait.Until(rsc.worker, time.Second, stopCh)
	}
	<-stopCh
}
func (rsc *ReplicaSetController) worker() {
  // 繼續(xù)查找實(shí)現(xiàn)
	for rsc.processNextWorkItem() {
	}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
  // 這里也有個(gè)queue的概念,可以類比kube-scheduler中的實(shí)現(xiàn)
  // 不同的是,這里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具體實(shí)現(xiàn)今天不細(xì)看
  // 獲取元素
	key, quit := rsc.queue.Get()
	if quit {
		return false
	}
	defer rsc.queue.Done(key)
  // 處理對(duì)應(yīng)的元素
	err := rsc.syncHandler(key.(string))
	if err == nil {
		rsc.queue.Forget(key)
		return true
	}
	utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
	rsc.queue.AddRateLimited(key)
	return true
}
// 再回過(guò)頭,去查看syncHandler的具體實(shí)現(xiàn)
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
	gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
	rsc.syncHandler = rsc.syncReplicaSet
	return rsc
}

syncReplicaSet

func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
	startTime := time.Now()
	defer func() {
		klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
	}()
	// 從key中拆分出 namespace 和 name
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
  // 根據(jù)name,從 Lister 獲取對(duì)應(yīng)的 ReplicaSets 信息
	rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
	if errors.IsNotFound(err) {
		klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
		rsc.expectations.DeleteExpectations(key)
		return nil
	}
	if err != nil {
		return err
	}
	rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
  // 獲取 selector (k8s 是根據(jù)selector中的label來(lái)匹配 ReplicaSets 和 Pod 的)
	selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
		return nil
	}
	// 根據(jù)namespace和labels獲取所有的pod
	allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
	if err != nil {
		return err
	}
        // 過(guò)濾無(wú)效的pod
	filteredPods := controller.FilterActivePods(allPods)
	// 根據(jù)selector再過(guò)濾pod
	filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
	if err != nil {
		return err
	}
	var manageReplicasErr error
	if rsNeedsSync && rs.DeletionTimestamp == nil {
    // 管理 ReplicaSet,下面詳細(xì)分析
		manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
	}
	rs = rs.DeepCopy()
	newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
	// 更新?tīng)顟B(tài)
	updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
	if err != nil {
		return err
	}
	if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
		updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
		updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
		rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
	}
	return manageReplicasErr
}
// 我們?cè)僖黄鹂纯?,?dāng)Pod數(shù)量和ReplicaSet中聲明的不同時(shí),是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
	// diff = 當(dāng)前pod數(shù) - 期望pod數(shù)
  diff := len(filteredPods) - int(*(rs.Spec.Replicas))
	rsKey, err := controller.KeyFunc(rs)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
		return nil
	}
  // diff小于0,表示需要擴(kuò)容,即新增Pod
	if diff < 0 {
    // 具體的實(shí)現(xiàn)暫時(shí)不細(xì)看
  // diff 大于0,即需要縮容
	} else if diff > 0 {
	}
	return nil
}

站在前人的肩膀上,向前輩致敬,Respect!

Summary

  • kube-controller-manager 的核心思想是: 根據(jù)期望狀態(tài)和當(dāng)前狀態(tài),管理Kubernetes中的資源。 以ReplicaSet為例,它對(duì)比了定義聲明的Pod數(shù)和當(dāng)前集群中滿足條件的Pod數(shù),進(jìn)行相對(duì)應(yīng)的擴(kuò)縮容。

以上就是Kubernetes controller manager運(yùn)行機(jī)制源碼解析的詳細(xì)內(nèi)容,更多關(guān)于Kubernetes controller manager的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • KubeSphere接入外部Elasticsearch實(shí)戰(zhàn)示例

    KubeSphere接入外部Elasticsearch實(shí)戰(zhàn)示例

    這篇文章主要為大家介紹了KubeSphere接入外部Elasticsearch實(shí)戰(zhàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • CKAD認(rèn)證中部署k8s并配置Calico插件

    CKAD認(rèn)證中部署k8s并配置Calico插件

    這篇文章介紹了CKAD認(rèn)證中部署k8s并配置Calico插件的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-03-03
  • Dashboard管理Kubernetes集群與API訪問(wèn)配置

    Dashboard管理Kubernetes集群與API訪問(wèn)配置

    這篇文章介紹了Dashboard管理Kubernetes集群與API訪問(wèn)配置的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • kubernetes實(shí)現(xiàn)分布式限流

    kubernetes實(shí)現(xiàn)分布式限流

    這篇文章介紹了kubernetes實(shí)現(xiàn)分布式限流的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-04-04
  • K8S集群范圍使用imagePullSecret示例詳解

    K8S集群范圍使用imagePullSecret示例詳解

    在這篇文章中,我將向你展示如何在?Kubernetes?中使用?imagePullSecrets示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-11-11
  • 了解Kubernetes中的Service和Endpoint

    了解Kubernetes中的Service和Endpoint

    這篇文章介紹了Kubernetes中的Service和Endpoint,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • Kubernetes(K8S)容器集群管理環(huán)境完整部署詳細(xì)教程-上篇

    Kubernetes(K8S)容器集群管理環(huán)境完整部署詳細(xì)教程-上篇

    本系列文章主要介紹了Kubernetes(K8S)容器集群管理環(huán)境完整部署的詳細(xì)教程,分為上中下三篇文章,此為上篇,需要的朋友可以參考下
    2022-01-01
  • Kubernetes?controller?manager運(yùn)行機(jī)制源碼解析

    Kubernetes?controller?manager運(yùn)行機(jī)制源碼解析

    這篇文章主要為大家介紹了Kubernetes?controller?manager運(yùn)行機(jī)制源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-11-11
  • 使用kubeadm命令行工具創(chuàng)建kubernetes集群

    使用kubeadm命令行工具創(chuàng)建kubernetes集群

    這篇文章介紹了使用kubeadm命令行工具創(chuàng)建kubernetes集群的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-03-03
  • Minikube搭建Kubernetes集群

    Minikube搭建Kubernetes集群

    這篇文章介紹了Minikube搭建Kubernetes集群的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-03-03

最新評(píng)論