Kubernetes?controller?manager運行機制源碼解析
Run
確立目標
理解 kube-controller-manager 的運行機制
從主函數(shù)找到run函數(shù),代碼較長,這里精簡了一下
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
// configz 模塊,在kube-scheduler分析中已經(jīng)了解
if cfgz, err := configz.New(ConfigzName); err == nil {
cfgz.Set(c.ComponentConfig)
} else {
klog.Errorf("unable to register configz: %v", err)
}
// 健康監(jiān)測與http服務(wù),跳過
var checks []healthz.HealthChecker
var unsecuredMux *mux.PathRecorderMux
run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
// client認證相關(guān)
var clientBuilder controller.ControllerClientBuilder
// 創(chuàng)建controller的上下文context
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
// 這里的 InformerFactory 和我們在kube-scheduler中看的 SharedInformerFactory 基本一致
controllerContext.InformerFactory.Start(controllerContext.Stop)
controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop)
close(controllerContext.InformersStarted)
select {}
}
// 是否進行選舉
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(context.TODO())
panic("unreachable")
}
// 拼接出一個全局唯一的id
id, err := os.Hostname()
if err != nil {
return err
}
id = id + "_" + string(uuid.NewUUID())
rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
c.LeaderElectionClient.CoreV1(),
c.LeaderElectionClient.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: c.EventRecorder,
})
if err != nil {
klog.Fatalf("error creating lock: %v", err)
}
// 正常情況下都是阻塞在RunOrDie這個函數(shù)中,不停地進行選舉相關(guān)的工作
leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
Callbacks: leaderelection.LeaderCallbacks{
// 開始成為Leader的時候,調(diào)用run函數(shù)
OnStartedLeading: run,
OnStoppedLeading: func() {
klog.Fatalf("leaderelection lost")
},
},
WatchDog: electionChecker,
Name: "kube-controller-manager",
})
panic("unreachable")
}
StartControllers
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error {
// 關(guān)鍵性的循環(huán),啟動每個controllers,key為控制器名字,value為初始化函數(shù)
for controllerName, initFn := range controllers {
// 是否允許啟動
if !ctx.IsControllerEnabled(controllerName) {
klog.Warningf("%q is disabled", controllerName)
continue
}
time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
klog.V(1).Infof("Starting %q", controllerName)
// 調(diào)用init函數(shù)進行啟動
debugHandler, started, err := initFn(ctx)
if err != nil {
klog.Errorf("Error starting %q", controllerName)
return err
}
if !started {
klog.Warningf("Skipping %q", controllerName)
continue
}
// 注冊對應(yīng)controller到debug的url中
if debugHandler != nil && unsecuredMux != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
klog.Infof("Started %q", controllerName)
}
return nil
}
// 我們再去傳入controller的函數(shù)去看看,對應(yīng)的controller有哪些,這里有我們很多常見的概念,不一一細講
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
controllers["endpoint"] = startEndpointController
controllers["endpointslice"] = startEndpointSliceController
controllers["endpointslicemirroring"] = startEndpointSliceMirroringController
controllers["replicationcontroller"] = startReplicationController
controllers["podgc"] = startPodGCController
controllers["resourcequota"] = startResourceQuotaController
controllers["namespace"] = startNamespaceController
controllers["serviceaccount"] = startServiceAccountController
controllers["garbagecollector"] = startGarbageCollectorController
controllers["daemonset"] = startDaemonSetController
controllers["job"] = startJobController
controllers["deployment"] = startDeploymentController
controllers["replicaset"] = startReplicaSetController
controllers["horizontalpodautoscaling"] = startHPAController
controllers["disruption"] = startDisruptionController
controllers["statefulset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["csrsigning"] = startCSRSigningController
controllers["csrapproving"] = startCSRApprovingController
controllers["csrcleaner"] = startCSRCleanerController
controllers["ttl"] = startTTLController
controllers["bootstrapsigner"] = startBootstrapSignerController
controllers["tokencleaner"] = startTokenCleanerController
controllers["nodeipam"] = startNodeIpamController
controllers["nodelifecycle"] = startNodeLifecycleController
if loopMode == IncludeCloudLoops {
controllers["service"] = startServiceController
controllers["route"] = startRouteController
controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
}
controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
controllers["attachdetach"] = startAttachDetachController
controllers["persistentvolume-expander"] = startVolumeExpandController
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished"] = startTTLAfterFinishedController
controllers["root-ca-cert-publisher"] = startRootCACertPublisher
controllers["ephemeral-volume"] = startEphemeralVolumeController
return controllers
}
ReplicaSet
由于我們的示例是創(chuàng)建一個nginx的pod,涉及到kube-controller-manager的內(nèi)容很少。
但是,為了加深大家對 kube-controller-manager 的認識,我們引入一個新的概念 - ReplicaSet,下面是官方說明:
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
ReplicaSet 的目的是維護一組在任何時候都處于運行狀態(tài)的 Pod 副本的穩(wěn)定集合。 因此,它通常用來保證給定數(shù)量的、完全相同的 Pod 的可用性。
簡單來說,ReplicaSet 就是用來生成指定個數(shù)的Pod
代碼在pkg/controller/replica_set.go
ReplicaSetController
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
return nil, false, nil
}
// 用goroutine異步運行,包含了 ReplicaSet和Pod 的兩個Informer
// 這一點很好理解:我們是要控制ReplicaSet聲明的數(shù)量和運行的Pod數(shù)量一致,需要同時觀察者兩種資源
go replicaset.NewReplicaSetController(
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
replicaset.BurstReplicas,
).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
return nil, true, nil
}
// 運行函數(shù)
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer rsc.queue.ShutDown()
controllerName := strings.ToLower(rsc.Kind)
klog.Infof("Starting %v controller", controllerName)
defer klog.Infof("Shutting down %v controller", controllerName)
if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
return
}
for i := 0; i < workers; i++ {
// 工作的函數(shù)
go wait.Until(rsc.worker, time.Second, stopCh)
}
<-stopCh
}
func (rsc *ReplicaSetController) worker() {
// 繼續(xù)查找實現(xiàn)
for rsc.processNextWorkItem() {
}
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
// 這里也有個queue的概念,可以類比kube-scheduler中的實現(xiàn)
// 不同的是,這里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具體實現(xiàn)今天不細看
// 獲取元素
key, quit := rsc.queue.Get()
if quit {
return false
}
defer rsc.queue.Done(key)
// 處理對應(yīng)的元素
err := rsc.syncHandler(key.(string))
if err == nil {
rsc.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))
rsc.queue.AddRateLimited(key)
return true
}
// 再回過頭,去查看syncHandler的具體實現(xiàn)
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
rsc.syncHandler = rsc.syncReplicaSet
return rsc
}
syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
}()
// 從key中拆分出 namespace 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 根據(jù)name,從 Lister 獲取對應(yīng)的 ReplicaSets 信息
rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)
rsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return err
}
rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)
// 獲取 selector (k8s 是根據(jù)selector中的label來匹配 ReplicaSets 和 Pod 的)
selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err))
return nil
}
// 根據(jù)namespace和labels獲取所有的pod
allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
return err
}
// 過濾無效的pod
filteredPods := controller.FilterActivePods(allPods)
// 根據(jù)selector再過濾pod
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
if err != nil {
return err
}
var manageReplicasErr error
if rsNeedsSync && rs.DeletionTimestamp == nil {
// 管理 ReplicaSet,下面詳細分析
manageReplicasErr = rsc.manageReplicas(filteredPods, rs)
}
rs = rs.DeepCopy()
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
// 更新狀態(tài)
updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus)
if err != nil {
return err
}
if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&
updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&
updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {
rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)
}
return manageReplicasErr
}
// 我們再一起看看,當Pod數(shù)量和ReplicaSet中聲明的不同時,是怎么工作的
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// diff = 當前pod數(shù) - 期望pod數(shù)
diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err))
return nil
}
// diff小于0,表示需要擴容,即新增Pod
if diff < 0 {
// 具體的實現(xiàn)暫時不細看
// diff 大于0,即需要縮容
} else if diff > 0 {
}
return nil
}
站在前人的肩膀上,向前輩致敬,Respect!
Summary
- kube-controller-manager 的核心思想是: 根據(jù)期望狀態(tài)和當前狀態(tài),管理Kubernetes中的資源。 以ReplicaSet為例,它對比了定義聲明的Pod數(shù)和當前集群中滿足條件的Pod數(shù),進行相對應(yīng)的擴縮容。
以上就是Kubernetes controller manager運行機制源碼解析的詳細內(nèi)容,更多關(guān)于Kubernetes controller manager的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
KubeSphere接入外部Elasticsearch實戰(zhàn)示例
這篇文章主要為大家介紹了KubeSphere接入外部Elasticsearch實戰(zhàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12
Dashboard管理Kubernetes集群與API訪問配置
這篇文章介紹了Dashboard管理Kubernetes集群與API訪問配置的方法,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
了解Kubernetes中的Service和Endpoint
這篇文章介紹了Kubernetes中的Service和Endpoint,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
Kubernetes(K8S)容器集群管理環(huán)境完整部署詳細教程-上篇
本系列文章主要介紹了Kubernetes(K8S)容器集群管理環(huán)境完整部署的詳細教程,分為上中下三篇文章,此為上篇,需要的朋友可以參考下2022-01-01
Kubernetes?controller?manager運行機制源碼解析
這篇文章主要為大家介紹了Kubernetes?controller?manager運行機制源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-11-11
使用kubeadm命令行工具創(chuàng)建kubernetes集群
這篇文章介紹了使用kubeadm命令行工具創(chuàng)建kubernetes集群的方法,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-03-03

