Kubernetes scheduler啟動(dòng)監(jiān)控資源變化解析
確立目標(biāo)
- 理解kube-scheduler啟動(dòng)的流程
- 了解
Informer
是如何從kube-apiserver監(jiān)聽資源變化的情況
理解kube-scheduler啟動(dòng)的流程 代碼在cmd/kube-scheduler
run
// kube-scheduler 類似于kube-apiserver,是個(gè)常駐進(jìn)程,查看其對(duì)應(yīng)的Run函數(shù) func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error { // 根據(jù)入?yún)?,返回配置cc與調(diào)度sched cc是completedConfig cc, sched, err := Setup(ctx, opts, registryOptions...) // 運(yùn)行 return Run(ctx, cc, sched) } // 運(yùn)行調(diào)度策略 func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { // 將配置注冊(cè)到configz中,會(huì)保存在一個(gè)全局map里 叫configs if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) } // 事件廣播管理器,涉及到k8s里的一個(gè)核心資源 - Event事件,暫時(shí)不細(xì)講 cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) // 健康監(jiān)測(cè)的服務(wù) var checks []healthz.HealthChecker // 異步各個(gè)Informer。Informer是kube-scheduler的一個(gè)重點(diǎn) go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) cc.InformerFactory.WaitForCacheSync(ctx.Done()) // 選舉Leader的工作,因?yàn)镸aster節(jié)點(diǎn)可以存在多個(gè),選舉一個(gè)作為L(zhǎng)eader if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ // 兩個(gè)鉤子函數(shù),開啟Leading時(shí)運(yùn)行調(diào)度,結(jié)束時(shí)打印報(bào)錯(cuò) OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } // 參與選舉的會(huì)持續(xù)通信 leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // 不參與選舉的,也就是單節(jié)點(diǎn)的情況時(shí),在這里運(yùn)行 sched.Run(ctx) return fmt.Errorf("finished without leader elect") } /* 到這里,我們已經(jīng)接觸了kube-scheduler的2個(gè)核心概念: 1. scheduler:正如程序名kube-scheduler,這個(gè)進(jìn)程的核心作用是進(jìn)行調(diào)度,會(huì)涉及到多種調(diào)度策略 2. Informer:k8s中有各種類型的資源,包括自定義的。而Informer的實(shí)現(xiàn)就將調(diào)度和資源結(jié)合了起來 */
Scheduler
// 在創(chuàng)建scheduler的函數(shù) runcommand() func Setup() { // 創(chuàng)建scheduler,包括多個(gè)選項(xiàng) sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), ) return &cc, sched, nil } // 我們?cè)倏匆幌翹ew這個(gè)函數(shù) func New() (*Scheduler, error) { // 先注冊(cè)了所有的算法,保存到一個(gè) map[string]PluginFactory 中 registry := frameworkplugins.NewInTreeRegistry() //NewInTreeRegistry里面的一些調(diào)度插件 /* return runtime.Registry{ selectorspread.Name: selectorspread.New, imagelocality.Name: imagelocality.New, tainttoleration.Name: tainttoleration.New, nodename.Name: nodename.New, nodeports.Name: nodeports.New, nodeaffinity.Name: nodeaffinity.New, podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New), ... */ // 重點(diǎn)看一下Scheduler的創(chuàng)建過程 var sched *Scheduler source := options.schedulerAlgorithmSource switch { // 根據(jù)Provider創(chuàng)建,重點(diǎn)看這里 case source.Provider != nil: sc, err := configurator.createFromProvider(*source.Provider) if err != nil { return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err) } sched = sc // 根據(jù)用戶設(shè)置創(chuàng)建,來自文件或者ConfigMap case source.Policy != nil: policy := &schedulerapi.Policy{} switch { case source.Policy.File != nil: if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil { return nil, err } case source.Policy.ConfigMap != nil: if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil { return nil, err } } configurator.extenders = policy.Extenders sc, err := configurator.createFromConfig(*policy) if err != nil { return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err) } sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } } // 創(chuàng)建 func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) { klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName) // 實(shí)例化算法的Registry r := algorithmprovider.NewRegistry() defaultPlugins, exist := r[providerName] if !exist { return nil, fmt.Errorf("algorithm provider %q is not registered", providerName) } // 將各種算法作為plugin進(jìn)行設(shè)置 for i := range c.profiles { prof := &c.profiles[i] plugins := &schedulerapi.Plugins{} plugins.Append(defaultPlugins) plugins.Apply(prof.Plugins) prof.Plugins = plugins } return c.create() } // 從這個(gè)初始化中可以看到,主要分為2類:默認(rèn)與ClusterAutoscaler兩種算法 func NewRegistry() Registry { // 默認(rèn)算法包括過濾、打分、綁定等,有興趣的去源碼中逐個(gè)閱讀 defaultConfig := getDefaultConfig() applyFeatureGates(defaultConfig) // ClusterAutoscaler 是集群自動(dòng)擴(kuò)展的算法,被單獨(dú)拎出來 caConfig := getClusterAutoscalerConfig() applyFeatureGates(caConfig) return Registry{ schedulerapi.SchedulerDefaultProviderName: defaultConfig, ClusterAutoscalerProvider: caConfig, } } /* 在這里,熟悉k8s的朋友會(huì)有個(gè)疑問:以前聽說kubernets的調(diào)度有個(gè)Predicate和Priority兩個(gè)算法,這里怎么沒有分類? 這個(gè)疑問,我們?cè)诤竺婢唧w場(chǎng)景時(shí)再進(jìn)行分析。 在新的版本中,這部分代碼邏輯是由拓展buildExtenders和nodelist,podQueue,維護(hù)了一個(gè)調(diào)度隊(duì)列,其余都是與上面差別不大的 */
NodeName
// 為了加深大家對(duì)Plugin的印象,我選擇一個(gè)最簡(jiǎn)單的示例:根據(jù)Pod的spec字段中的NodeName,分配到指定名稱的節(jié)點(diǎn) package nodename import ( "context" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" ) type NodeName struct{} var _ framework.FilterPlugin = &NodeName{} // 這個(gè)調(diào)度算法的名稱和錯(cuò)誤信息 const ( Name = "NodeName" ErrReason = "node(s) didn't match the requested hostname" ) // 調(diào)度算法的明明 func (pl *NodeName) Name() string { return Name } // 過濾功能,這個(gè)就是NodeName算法的實(shí)現(xiàn) func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { // 找不到Node if nodeInfo.Node() == nil { return framework.NewStatus(framework.Error, "node not found") } // 匹配不到,返回錯(cuò)誤 if !Fits(pod, nodeInfo) { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason) } return nil } /* 匹配的算法,兩種條件滿足一個(gè)就認(rèn)為成功 1. spec沒有填NodeName 2.spec的NodeName和節(jié)點(diǎn)匹配 */ func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool { return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name } // 初始化 func New(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) { return &NodeName{}, nil }
了解Informer
是如何從kube-apiserver監(jiān)聽資源變化的情況
Informer
什么是Informer
?先重點(diǎn)講一下這個(gè)Informer,因?yàn)樗抢斫鈑8s運(yùn)行機(jī)制的核心概念。
簡(jiǎn)單概況下,Informer
的核心功能是 獲取并監(jiān)聽(ListAndWatch)對(duì)應(yīng)資源的增刪改,觸發(fā)相應(yīng)的事件操作(ResourceEventHandler)
在Setup()中有個(gè)Config,里面有個(gè)scheduler.NewInformerFactory()在這里進(jìn)入,代碼在k8s.io/client-go/informers/factory.go中
Shared Informer
/* client 是連接到 kube-apiserver 的客戶端。 我們要理解k8s的設(shè)計(jì): 1. etcd是核心的數(shù)據(jù)存儲(chǔ),對(duì)資源的修改會(huì)進(jìn)行持久化 2. 只有kube-apiserver可以訪問etcd 所以,kube-scheduler要了解資源的變化情況,只能通過kube-apiserver */ // 定義了 Shared Informer,其中這個(gè)client是用來連接kube-apiserver的 c.InformerFactory = informers.NewSharedInformerFactory(client, 0) // 這里解答了為什么叫shared:一個(gè)資源會(huì)對(duì)應(yīng)多個(gè)Informer,會(huì)導(dǎo)致效率低下,所以讓一個(gè)資源對(duì)應(yīng)一個(gè)sharedInformer,而一個(gè)sharedInformer內(nèi)部自己維護(hù)多個(gè)Informer type sharedInformerFactory struct { client kubernetes.Interface namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration // 這個(gè)map就是維護(hù)多個(gè)Informer的關(guān)鍵實(shí)現(xiàn) informers map[reflect.Type]cache.SharedIndexInformer startedInformers map[reflect.Type]bool } // 運(yùn)行函數(shù) func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { // goroutine異步處理 go informer.Run(stopCh) // 標(biāo)記為已經(jīng)運(yùn)行,這樣即使下次Start也不會(huì)重復(fù)運(yùn)行 f.startedInformers[informerType] = true } } } // 查找對(duì)應(yīng)的informer func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() // 找到就直接返回 informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } // 沒找到就會(huì)新建Informer informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer } // SharedInformerFactory 是 sharedInformerFactory 的接口定義,點(diǎn)進(jìn)func NewSharedInformerFactoryWithOptions的返回值 type SharedInformerFactory interface { // 我們這一階段關(guān)注的Pod的Informer,屬于核心資源 Core() core.Interface } // core.Interface的定義 type Interface interface { // V1 provides access to shared informers for resources in V1. V1() v1.Interface } // v1.Interface 的定義 type Interface interface { // Pod的定義 ... Pods() PodInformer ... } // PodInformer 是對(duì)應(yīng)的接口 type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister } // podInformer 是具體的實(shí)現(xiàn) type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } // 最后,我們可以看到podInformer調(diào)用了InformerFor函數(shù)進(jìn)行了添加 func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) }
PodInformer
// 實(shí)例化PodInformer,把對(duì)應(yīng)的List/Watch操作方法傳入到實(shí)例化函數(shù),生成統(tǒng)一的SharedIndexInformer接口 func NewFilteredPodInformer() cache.SharedIndexInformer { return cache.NewSharedIndexInformer( // List和Watch實(shí)現(xiàn)從PodInterface里面查詢 &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) } // 點(diǎn)進(jìn)List,在這個(gè)文件中 // 我們先看看Pod基本的List和Watch是怎么定義的 // Pod基本的增刪改查等操作 type PodInterface interface { List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) ... } // pods 是PodInterface的實(shí)現(xiàn) type pods struct { client rest.Interface ns string } // List 和 Watch 是依賴客戶端,也就是從kube-apiserver中查詢的 func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) { err = c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Do(ctx). Into(result) return } func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { return c.client.Get(). Namespace(c.ns). Resource("pods"). VersionedParams(&opts, scheme.ParameterCodec). Timeout(timeout). Watch(ctx) } // 在func NewPodInformer中找到他的返回值 點(diǎn)進(jìn)去cache.SharedIndexInformer這是Informer的統(tǒng)一接口 在這個(gè)文件的里面找到下面的代碼 // 在上面,我們看到了異步運(yùn)行Informer的代碼 go informer.Run(stopCh),我們看看是怎么run的 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { // 這里有個(gè) DeltaFIFO 的對(duì)象, fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) // 傳入這個(gè)fifo到cfg cfg := &Config{ Queue: fifo, ... } // 新建controller func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // 運(yùn)行controller s.controller.Run(stopCh) } // 點(diǎn)進(jìn)New看Controller的運(yùn)行 func (c *controller) Run(stopCh <-chan struct{}) { // r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock if c.config.WatchErrorHandler != nil { r.watchErrorHandler = c.config.WatchErrorHandler } c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group // 生產(chǎn),往Queue里放數(shù)據(jù) wg.StartWithChannel(stopCh, r.Run) // 消費(fèi),從Queue消費(fèi)數(shù)據(jù) wait.Until(c.processLoop, time.Second, stopCh) wg.Wait() }
Reflect
點(diǎn)進(jìn)r.Run() Reflect監(jiān)聽事件放到FIFO中然后處理循環(huán) 取出事件消費(fèi)
// 我們?cè)倩仡^看看這個(gè)Reflect結(jié)構(gòu) r := NewReflector( // ListerWatcher 我們已經(jīng)有了解,就是通過client監(jiān)聽kube-apiserver暴露出來的Resource c.config.ListerWatcher, c.config.ObjectType, // Queue 是我們前文看到的一個(gè) DeltaFIFOQueue,認(rèn)為這是一個(gè)先進(jìn)先出的隊(duì)列 c.config.Queue, c.config.FullResyncPeriod, ) func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { // 調(diào)用了ListAndWatch if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) } func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // watchHandler顧名思義,就是Watch到對(duì)應(yīng)的事件,調(diào)用對(duì)應(yīng)的Handler if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case isExpiredError(err): klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } } return nil } } } func (r *Reflector) watchHandler() error { loop: for { // 一個(gè)經(jīng)典的GO語(yǔ)言select監(jiān)聽多channel的模式 select { // 整體的step channel case <-stopCh: return errorStopRequested // 錯(cuò)誤相關(guān)的error channel case err := <-errc: return err // 接收事件event的channel case event, ok := <-w.ResultChan(): // channel被關(guān)閉,退出loop if !ok { break loop } // 一系列的資源驗(yàn)證代碼跳過 switch event.Type { // 增刪改三種Event,分別對(duì)應(yīng)到去store,即DeltaFIFO中,操作object case watch.Added: err := r.store.Add(event.Object) case watch.Modified: err := r.store.Update(event.Object) case watch.Deleted: err := r.store.Delete(event.Object) case watch.Bookmark: default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } } } return nil }
站在前人的肩膀上,向前輩致敬,Respect!
Summary
- kube-scheduler也是插件化的調(diào)度策略,通過配置在啟動(dòng)的時(shí)候注冊(cè)上
plugins
,通過Informer
來監(jiān)聽資源的狀態(tài)和變化,進(jìn)行調(diào)度 Informer
依賴于Reflector
模塊,它的組件為 xxxInformer,如podInformer
- 具體資源的
Informer
包含了一個(gè)連接到kube-apiserver
的client
,通過List
和Watch
接口查詢資源變更情況 - 檢測(cè)到資源發(fā)生變化后,通過
Controller
將數(shù)據(jù)放入隊(duì)列DeltaFIFOQueue
里,生產(chǎn)階段完成,交給對(duì)應(yīng)的handler處理函數(shù)進(jìn)行下一步的操作
以上就是Kubernetes scheduler啟動(dòng)監(jiān)控資源變化解析的詳細(xì)內(nèi)容,更多關(guān)于Kubernetes scheduler啟動(dòng)監(jiān)控的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Kubernetes(K8S)入門基礎(chǔ)內(nèi)容介紹
這篇文章介紹了Kubernetes(K8S)的入門基礎(chǔ)內(nèi)容,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-03-03kubernetes中的namespace、node、pod介紹
這篇文章介紹了kubernetes中的namespace、node、pod,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-03-03Kubernetes kubectl中Pod創(chuàng)建流程源碼解析
這篇文章主要為大家介紹了Kubernetes kubectl中Pod創(chuàng)建流程源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11Kubernetes(K8S)容器集群管理環(huán)境完整部署詳細(xì)教程-上篇
本系列文章主要介紹了Kubernetes(K8S)容器集群管理環(huán)境完整部署的詳細(xì)教程,分為上中下三篇文章,此為上篇,需要的朋友可以參考下2022-01-01Dashboard管理Kubernetes集群與API訪問配置
這篇文章介紹了Dashboard管理Kubernetes集群與API訪問配置的方法,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04Kubernetes Informer數(shù)據(jù)存儲(chǔ)Index與Pod分配流程解析
這篇文章主要為大家介紹了Kubernetes Informer數(shù)據(jù)存儲(chǔ)Index與Pod分配流程解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11了解Kubernetes中的Service和Endpoint
這篇文章介紹了Kubernetes中的Service和Endpoint,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04