欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go語言kube-scheduler深度剖析與開發(fā)之pod調(diào)度

 更新時(shí)間:2023年04月23日 10:26:05   作者:俯仰之間  
這篇文章主要為大家介紹了Go語言kube-scheduler深度剖析與開發(fā),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

正文

為了深入學(xué)習(xí) kube-scheduler,本系從源碼和實(shí)戰(zhàn)角度深度學(xué) 習(xí)kube-scheduler,該系列一共分6篇文章,如下:

  • kube-scheduler 整體架構(gòu)
  • 初始化一個(gè) scheduler
  • 本文: 一個(gè) Pod 是如何調(diào)度的
  • 如何開發(fā)一個(gè)屬于自己的scheduler插件
  • 開發(fā)一個(gè) prefilter 擴(kuò)展點(diǎn)的插件
  • 開發(fā)一個(gè) socre 擴(kuò)展點(diǎn)的插件

上一篇文章我們講了一個(gè) kube-scheduler 是怎么初始化出來的,有了 調(diào)度器之后就得開始讓他干活了 這一篇文章我們來講講一個(gè) Pod 是怎么被調(diào)度到某個(gè) Node 的。

我把調(diào)度一個(gè) Pod 分為3個(gè)階段

  • 獲取需要被調(diào)度的 Pod
  • 運(yùn)行每個(gè)擴(kuò)展點(diǎn)的所有插件,給 Pod 選擇一個(gè)最合適的 Node
  • 將 Pod 綁定到選出來的 Node

感知 Pod

要能夠獲取到 Pod 的前提是:kube-scheduler 能感知到有 Pod 需要被調(diào)度,得知有 Pod 需要被調(diào)度后還需要有地方存放被調(diào)度的 Pod 的信息。為了感知有 Pod 需要被調(diào)度,kube-scheduler 啟動(dòng)時(shí)通過 Informer watch Pod 的變化,它把待調(diào)度的 Pod 分了兩種情況,代碼如下

// pkg/scheduler/eventhandlers.go
func addAllEventHandlers(...) {
  //已經(jīng)調(diào)度過的 Pod 則加到本地緩存,并判斷是加入到調(diào)度隊(duì)列還是加入到backoff隊(duì)列
  informerFactory.Core().V1().Pods().Informer().AddEventHandler(
    cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
        switch t := obj.(type) {
        case *v1.Pod:
          return assignedPod(t)
        case cache.DeletedFinalStateUnknown:
          if _, ok := t.Obj.(*v1.Pod); ok {
            // The carried object may be stale, so we don't use it to check if
            // it's assigned or not. Attempting to cleanup anyways.
            return true
          }
          utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
          return false
        default:
          utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
          return false
        }
      },
      Handler: cache.ResourceEventHandlerFuncs{
        AddFunc:    sched.addPodToCache,
        UpdateFunc: sched.updatePodInCache,
        DeleteFunc: sched.deletePodFromCache,
      },
    },
  )
  // 沒有調(diào)度過的Pod,放到調(diào)度隊(duì)列
  informerFactory.Core().V1().Pods().Informer().AddEventHandler(
    cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
        switch t := obj.(type) {
        case *v1.Pod:
          return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
        case cache.DeletedFinalStateUnknown:
          if pod, ok := t.Obj.(*v1.Pod); ok {
            // The carried object may be stale, so we don't use it to check if
            // it's assigned or not.
            return responsibleForPod(pod, sched.Profiles)
          }
          utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
          return false
        default:
          utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
          return false
        }
      },
      Handler: cache.ResourceEventHandlerFuncs{
        AddFunc:    sched.addPodToSchedulingQueue,
        UpdateFunc: sched.updatePodInSchedulingQueue,
        DeleteFunc: sched.deletePodFromSchedulingQueue,
      },
    },
  )
......
}
  • 已經(jīng)調(diào)度過的 Pod 區(qū)分是不是調(diào)度過的 Pod 是通過:len(pod.Spec.NodeName) != 0 來判斷的,因?yàn)檎{(diào)度過的 Pod 這個(gè)字段總是會(huì)被賦予被選中的 Node 名字。但是,既然是調(diào)度過的 Pod 下面的代碼中為什么還要區(qū)分:sched.addPodToCache 和 sched.updatePodInCache 呢?原因在于我們可以在創(chuàng)建 Pod 的時(shí)候人為給它分配一個(gè) Node(即給 pod.Spec.NodeName 賦值),這樣 kube-scheduler 在監(jiān)聽到該 Pod 后,判斷這個(gè) Pod 的該字段不為空就會(huì)認(rèn)為這個(gè) Pod 已經(jīng)調(diào)度過了,但是這個(gè)字段不為空并不是 kube-scheduler 調(diào)度的結(jié)果,而是人為賦值的,那么 kube-scheduler 的 cache(可以參考上一篇 cache 相關(guān)的內(nèi)容)中沒有這個(gè) Pod 的信息,所以就需要將 Pod 信息加入到 cache 中。至于在監(jiān)聽到 Pod 后 sched.addPodToCache 和 sched.updatePodInCache 哪個(gè)會(huì)被調(diào)用,這是 Informer 決定的,它會(huì)根據(jù)監(jiān)聽到變化的 Pod 和 Informer 的本地緩存做對比,要是緩存中沒有這個(gè) Pod,那么就調(diào)用 add 函數(shù),否則就調(diào)用 update 函數(shù)。

加入或更新緩存后,還需要做一件事:去 unschedulablePods(調(diào)度失敗的Pod) 中獲取 Pod,這些 Pod 的親和性和剛剛加入的這個(gè) Pod 匹配,然后根據(jù)下面的規(guī)則判斷是把 Pod 放入 backoffQ 還是放入 activeQ

  • 根據(jù)這個(gè) Pod 嘗試被調(diào)度的次數(shù)計(jì)算這個(gè) Pod 下次調(diào)度應(yīng)該等待的時(shí)間,計(jì)算規(guī)則為指數(shù)級增長,即按照1s,2s,4s,8s這樣的時(shí)間進(jìn)行等待,但是這個(gè)等待時(shí)間也不會(huì)無限增加,會(huì)受到 podMaxBackoffDuration(默認(rèn)10s) 的限制,這個(gè)參數(shù)表示是一個(gè) Pod 處于 backoff 的最大時(shí)間,如果等待的時(shí)間如果超過了 podMaxBackoffDuration,那么就只等待 podMaxBackoffDuration 就會(huì)再次被調(diào)度;
  • 當(dāng)前時(shí)間 - 上次調(diào)度的時(shí)間 > 根據(jù)(1)獲取到的應(yīng)該等待的時(shí)間,那么就把Pod放到activeQ里面,將會(huì)被調(diào)度,否則Pod被放入 backoff 隊(duì)列里等待。

從上面我們可以看到,一個(gè) Pod 的變化會(huì)觸發(fā)此前調(diào)度失敗的 Pod 重新判斷是否可以被調(diào)度

  • 沒有調(diào)度過的 Pod

len(pod.Spec.NodeName) = 0,那么這個(gè) Pod 沒有被調(diào)度過或者是此前調(diào)度過但是調(diào)度失敗的(用戶修改了 Pod 的配置導(dǎo)致 Pod 發(fā)生變化,又被 kube-scheduler 感知到了),如果是沒有調(diào)度過的 Pod 那么直接加入到 activeQ,如果是調(diào)度失敗的 Pod 則根據(jù)上述規(guī)則判斷是加入 backoffQ 還是 activeQ。加入到 activeQ 會(huì)馬上被取走,然后開始調(diào)度。

那么那些因?yàn)檎{(diào)度失敗而被放入 unscheduleable 的 Pod 還有其他機(jī)會(huì)(上面說的有新 Pod 創(chuàng)建是一個(gè)機(jī)會(huì))重新被調(diào)度么?答案是有的,否則他們就“被餓死了”,有兩種途徑:1. 定期強(qiáng)制將 unscheduleable 的 Pod 放入 backoffQ 或 activeQ,定期將 backoffQ 等待超時(shí)的 Pod 放入 ac activeQ;2. 集群內(nèi)其他相關(guān)資源變化時(shí),判斷 unscheduleable 中的 Pod 是不是要放入 backoffQ 或 activeQ,其實(shí)這跟有 Pod 發(fā)生變化的情況是一樣的。

第一種情況

在 kube-scheduler啟動(dòng)的時(shí)候中會(huì)起兩個(gè)協(xié)程,他們會(huì)定期把 backoffQ 和 unscheduleable 里面的 Pod拿到activeQ里面去

func (p *PriorityQueue) Run() {
   go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
   go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

flushUnschedulablePodsLeftover

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
   p.lock.Lock()
   defer p.lock.Unlock()
   var podsToMove []*framework.QueuedPodInfo
   currentTime := p.clock.Now()
   for _, pInfo := range p.unschedulablePods.podInfoMap {
      lastScheduleTime := pInfo.Timestamp
      if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
         podsToMove = append(podsToMove, pInfo)
      }
   }
   if len(podsToMove) > 0 {
      p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
   }
}
    func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
       activated := false
       for _, pInfo := range podInfoList {
          // If the event doesn't help making the Pod schedulable, continue.
          // Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes
          // either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.
          // In that case, it's desired to move it anyways.
          if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
             continue
          }
          pod := pInfo.Pod
          if p.isPodBackingoff(pInfo) {
             if err := p.podBackoffQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
             } else {
                metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
                p.unschedulablePods.delete(pod)
             }
          } else {
             if err := p.activeQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the scheduling queue", "pod", klog.KObj(pod))
             } else {
                    metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
                p.unschedulablePods.delete(pod)
             }
          }
       }
       p.moveRequestCycle = p.schedulingCycle
       if activated {
          p.cond.Broadcast()
       }
    }x

將在 unscheduleable 里面停留時(shí)長超過 podMaxInUnschedulablePodsDuration(默認(rèn)是5min)的pod放入到 ActiveQ 或 BackoffQueue,具體是放到哪個(gè)隊(duì)列里面,還是根據(jù)我們上文說的那個(gè)實(shí)際計(jì)算規(guī)則來。這么做的原因就是給那些“問題少年”一次重新做人的機(jī)會(huì),也不能一犯錯(cuò)誤(調(diào)度失?。┚蛷氐状蛉胨览瘟?。

flushBackoffQCompleted

去 backoffQ 獲取等待結(jié)束的 Pod,放入 activeQ

    func (p *PriorityQueue) flushBackoffQCompleted() {
       p.lock.Lock()
       defer p.lock.Unlock()
       activated := false
       for {
          rawPodInfo := p.podBackoffQ.Peek()
          if rawPodInfo == nil {
             break
          }
          pod := rawPodInfo.(*framework.QueuedPodInfo).Pod
          boTime := p.getBackoffTime(rawPodInfo.(*framework.QueuedPodInfo))
          if boTime.After(p.clock.Now()) {
             break
          }
          _, err := p.podBackoffQ.Pop()
          if err != nil {
             klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
             break
          }
          p.activeQ.Add(rawPodInfo)
          metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
          activated = true
       }
       if activated {
          p.cond.Broadcast()
       }
    }

第二種情況

集群內(nèi)資源發(fā)生變化

  • 有新節(jié)點(diǎn)加入集群
  • 節(jié)點(diǎn)配置或狀態(tài)發(fā)生變化
  • 已經(jīng)存在的 Pod 發(fā)生變化
  • 集群內(nèi)有Pod被刪除
informerFactory.Core().V1().Nodes().Informer().AddEventHandler(
   cache.ResourceEventHandlerFuncs{
      AddFunc:    sched.addNodeToCache,
      UpdateFunc: sched.updateNodeInCache,
      DeleteFunc: sched.deleteNodeFromCache,
   },
)

新加入節(jié)點(diǎn)

func (sched *Scheduler) addNodeToCache(obj interface{}) {
   node, ok := obj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert to *v1.Node", "obj", obj)
      return
   }
   nodeInfo := sched.Cache.AddNode(node)
   klog.V(3).InfoS("Add event for node", "node", klog.KObj(node))
   sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd, preCheckForNode(nodeInfo))
}
func preCheckForNode(nodeInfo *framework.NodeInfo) queue.PreEnqueueCheck {
   // Note: the following checks doesn't take preemption into considerations, in very rare
   // cases (e.g., node resizing), "pod" may still fail a check but preemption helps. We deliberately
   // chose to ignore those cases as unschedulable pods will be re-queued eventually.
   return func(pod *v1.Pod) bool {
      admissionResults := AdmissionCheck(pod, nodeInfo, false)
      if len(admissionResults) != 0 {
         return false
      }
      _, isUntolerated := corev1helpers.FindMatchingUntoleratedTaint(nodeInfo.Node().Spec.Taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
         return t.Effect == v1.TaintEffectNoSchedule
      })
      return !isUntolerated
   }
}

可以看到,當(dāng)有節(jié)點(diǎn)加入集群的時(shí)候,會(huì)把 unscheduleable 里面的Pod 依次拿出來做下面的判斷:

  • Pod 對 節(jié)點(diǎn)的親和性
  • Pod 中 Nodename不為空 那么判斷新加入節(jié)點(diǎn)的Name判斷pod Nodename是否相等
  • 判斷 Pod 中容器對端口的要求是否和新加入節(jié)點(diǎn)已經(jīng)被使用的端口沖突
  • Pod 是否容忍了Node的Pod

只有上述4個(gè)條件都滿足,那么新加入節(jié)點(diǎn)這個(gè)事件才會(huì)觸發(fā)這個(gè)未被調(diào)度的Pod加入到 backoffQ 或者 activeQ,至于是加入哪個(gè)queue,上面已經(jīng)分析過了

節(jié)點(diǎn)更新

func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
   oldNode, ok := oldObj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert oldObj to *v1.Node", "oldObj", oldObj)
      return
   }
   newNode, ok := newObj.(*v1.Node)
   if !ok {
      klog.ErrorS(nil, "Cannot convert newObj to *v1.Node", "newObj", newObj)
      return
   }
   nodeInfo := sched.Cache.UpdateNode(oldNode, newNode)
   // Only requeue unschedulable pods if the node became more schedulable.
   if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != nil {
      sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(*event, preCheckForNode(nodeInfo))
   }
}
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) *framework.ClusterEvent {
   if nodeSpecUnschedulableChanged(newNode, oldNode) {
      return &queue.NodeSpecUnschedulableChange
   }
   if nodeAllocatableChanged(newNode, oldNode) {
      return &queue.NodeAllocatableChange
   }
   if nodeLabelsChanged(newNode, oldNode) {
      return &queue.NodeLabelChange
   }
   if nodeTaintsChanged(newNode, oldNode) {
      return &queue.NodeTaintChange
   }
   if nodeConditionsChanged(newNode, oldNode) {
      return &queue.NodeConditionChange
   }
   return nil
}

首先是判斷節(jié)點(diǎn)是何種配置發(fā)生了變化,有如下情況

  • 節(jié)點(diǎn)可調(diào)度情況發(fā)生變化
  • 節(jié)點(diǎn)可分配資源發(fā)生變化
  • 節(jié)點(diǎn)標(biāo)簽發(fā)生變化
  • 節(jié)點(diǎn)污點(diǎn)發(fā)生變化
  • 節(jié)點(diǎn)狀態(tài)發(fā)生變化

如果某個(gè) Pod 調(diào)度失敗的原因可以匹配到上面其中一個(gè)原因,那么節(jié)點(diǎn)更新這個(gè)事件才會(huì)觸發(fā)這個(gè)未被調(diào)度的Pod加入到 backoffQ 或者 activeQ

informerFactory.Core().V1().Pods().Informer().AddEventHandler(
   cache.FilteringResourceEventHandler{
      FilterFunc: func(obj interface{}) bool {
         switch t := obj.(type) {
         case *v1.Pod:
            return assignedPod(t)
         case cache.DeletedFinalStateUnknown:
            if _, ok := t.Obj.(*v1.Pod); ok {
               // The carried object may be stale, so we don't use it to check if
               // it's assigned or not. Attempting to cleanup anyways.
               return true
            }
            utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
            return false
         default:
            utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
            return false
         }
      },
      Handler: cache.ResourceEventHandlerFuncs{
         AddFunc:    sched.addPodToCache,
         UpdateFunc: sched.updatePodInCache,
         DeleteFunc: sched.deletePodFromCache,
      },
   },
)

已經(jīng)存在的 Pod 發(fā)生變化

func (sched *Scheduler) addPodToCache(obj interface{}) {
   pod, ok := obj.(*v1.Pod)
   if !ok {
      klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", obj)
      return
   }
   klog.V(3).InfoS("Add event for scheduled pod", "pod", klog.KObj(pod))
   if err := sched.Cache.AddPod(pod); err != nil {
      klog.ErrorS(err, "Scheduler cache AddPod failed", "pod", klog.KObj(pod))
   }
   sched.SchedulingQueue.AssignedPodAdded(pod)
}
func (p *PriorityQueue) AssignedPodAdded(pod *v1.Pod) {
   p.lock.Lock()
   p.movePodsToActiveOrBackoffQueue(p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
   p.lock.Unlock()
}
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
   var nsLabels labels.Set
   nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
   var podsToMove []*framework.QueuedPodInfo
   for _, pInfo := range p.unschedulablePods.podInfoMap {
      for _, term := range pInfo.RequiredAffinityTerms {
         if term.Matches(pod, nsLabels) {
            podsToMove = append(podsToMove, pInfo)
            break
         }
      }
   }
   return podsToMove
}

可以看到,已經(jīng)存在的Pod發(fā)生變化后,會(huì)把這個(gè)Pod親和性配置依次和 unscheduleable 里面的Pod匹配,如果能夠匹配上,那么節(jié)點(diǎn)更新這個(gè)事件才會(huì)觸發(fā)這個(gè)未被調(diào)度的Pod加入到 backoffQ 或者 activeQ。

集群內(nèi)有Pod刪除

func (sched *Scheduler) deletePodFromCache(obj interface{}) {
  var pod *v1.Pod
   switch t := obj.(type) {
   case *v1.Pod:
      pod = t
   case cache.DeletedFinalStateUnknown:
      var ok bool
      pod, ok = t.Obj.(*v1.Pod)
      if !ok {
         klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t.Obj)
         return
      }
   default:
      klog.ErrorS(nil, "Cannot convert to *v1.Pod", "obj", t)
      return
   }
   klog.V(3).InfoS("Delete event for scheduled pod", "pod", klog.KObj(pod))
   if err := sched.Cache.RemovePod(pod); err != nil {
      klog.ErrorS(err, "Scheduler cache RemovePod failed", "pod", klog.KObj(pod))
   }
   sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete, nil)
}

可以看到,Pod刪除時(shí)間不像其他時(shí)間需要做額外的判斷,這個(gè)preCheck函數(shù)是空的,所以所有 unscheduleable 里面的Pod都會(huì)被放到 activeQ 或 backoffQ 中。

從上面的情況,我們可以看到,集群內(nèi)有事件發(fā)生變化,是可以加速調(diào)度失敗的Pod被重新調(diào)度的進(jìn)程的。常規(guī)的是,調(diào)度失敗的 Pod 需要等5min 然后才會(huì)被重新加入 backoffQ 或 activeQ。backoffQ里面的Pod也需要等一段時(shí)間才會(huì)重新調(diào)度。這也就是為什么,當(dāng)你修改節(jié)點(diǎn)配置的時(shí)候,能看到Pod馬上重新被調(diào)度的原因

上面就是一個(gè)Pod調(diào)度失敗后,重新觸發(fā)調(diào)度的情況了。

取出 Pod

Scheduler 中有個(gè)成員 NextPod 會(huì)從 activeQ 隊(duì)列中嘗試獲取一個(gè)待調(diào)度的 Pod,該函數(shù)在 SchedulePod 中被調(diào)用,如下:

// 啟動(dòng) Scheduler
func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
	go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	<-ctx.Done()
	sched.SchedulingQueue.Close()
}
// 嘗試調(diào)度一個(gè) Pod,所以 Pod 的調(diào)度入口
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	// 會(huì)一直阻塞,直到獲取到一個(gè)Pod
	......
	podInfo := sched.NextPod()
    ......
}

NextPod 它被賦予如下函數(shù):

// pkg/scheduler/internal/queue/scheduling_queue.go
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
	return func() *framework.QueuedPodInfo {
		podInfo, err := queue.Pop()
		if err == nil {
			klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
			for plugin := range podInfo.UnschedulablePlugins {
				metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
			}
			return podInfo
		}
		klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
		return nil
	}
}

Pop 會(huì)一直阻塞,直到 activeQ 長度大于0,然后取出一個(gè) Pod 返回

// pkg/scheduler/internal/queue/scheduling_queue.go
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
	p.lock.Lock()
	defer p.lock.Unlock()
	for p.activeQ.Len() == 0 {
		// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
		// When Close() is called, the p.closed is set and the condition is broadcast,
		// which causes this loop to continue and return from the Pop().
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
		p.cond.Wait()
	}
	obj, err := p.activeQ.Pop()
	if err != nil {
		return nil, err
	}
	pInfo := obj.(*framework.QueuedPodInfo)
	pInfo.Attempts++
	p.schedulingCycle++
	return pInfo, nil
}

調(diào)度 Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 取出 Pod
    podInfo := sched.NextPod()
    ...
    // 根據(jù) Pod 的調(diào)度名字,獲取之前初始化好的調(diào)度框架(framework)
    fwk, err := sched.frameworkForPod(pod)
    ...
    // 開始執(zhí)行插件,包括 filter, socre 兩個(gè)擴(kuò)展點(diǎn)內(nèi)的所有插件,獲取一個(gè)最合適 Pod 的節(jié)點(diǎn)
    scheduleResult, err := sched.SchedulePod(schedulingCycleCtx, fwk, state, pod)
    // 如果獲取節(jié)點(diǎn)失敗,則開始運(yùn)行 postFilter 開始搶占一個(gè) Pod
    if err != nil {
        result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
    }
    ....
    // 將 Pod 放入 assumedPod 存儲(chǔ),即假設(shè) Pod 已經(jīng)調(diào)度成功
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    // 運(yùn)行 Reserve 插件
    fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    ...
    // 運(yùn)行 Permit 插件
    fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    ...
    // 啟動(dòng)一個(gè)協(xié)程,開始綁定,主流程到了這里就結(jié)束了,然后開始新的一輪調(diào)度;
    go func() {
        // 執(zhí)行 preBind 插件
        fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
        ...
        // 執(zhí)行綁定插件,會(huì)調(diào)用 kube-apiserver 寫入etcd 調(diào)度結(jié)果,就是給 Pod 賦予 Nodename
        err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
        ...
        // 執(zhí)行 postBind
        fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
    }
  • 執(zhí)行 filter 類型擴(kuò)展點(diǎn)(包括preFilter,filter,postFilter)插件,選出所有符合 Pod 的 Node,如果無法找到符合的 Node, 則把 Pod 加入 unscheduleable 中,此次調(diào)度結(jié)束;
  • 執(zhí)行 score 擴(kuò)展點(diǎn)插件,給所有 Node 打分;
  • 拿出得分最高的 Node;
  • assume Pod。這一步就是樂觀假設(shè) Pod 已經(jīng)調(diào)度成功,更新緩存中 Node 和 PodStats 信息,到了這里scheduling cycle就已經(jīng)結(jié)束了,然后會(huì)開啟新的一輪調(diào)度。至于真正的綁定,則會(huì)新起一個(gè)協(xié)程。
  • 執(zhí)行 reserve 插件;
  • 啟動(dòng)協(xié)程綁定 Pod 到 Node上。實(shí)際上就是修改 Pod.spec.nodeName: 選定的node名字,然后調(diào)用 kube-apiserver 接口寫入 etcd。如果綁定失敗了,那么移除緩存中此前加入的信息,然后把 Pod 放入activeQ 中,后續(xù)重新調(diào)度。執(zhí)行 postBinding,該步?jīng)]有實(shí)現(xiàn)的插件沒所以沒有做任何事。

好了,到了這里一個(gè) Pod 如果能夠正常的被調(diào)度的話,那么流程就結(jié)束了。如果調(diào)度失敗的話,Pod會(huì)被放入 unscheduleable 中,后續(xù)還會(huì)對 unscheduleable 中的 Pod 重新調(diào)度。

相關(guān)文章

  • Go語言開發(fā)快速學(xué)習(xí)CGO編程

    Go語言開發(fā)快速學(xué)習(xí)CGO編程

    這篇文章主要為大家介紹了Go語言開發(fā)之快速學(xué)習(xí)CGO編程,看了本文你就會(huì)發(fā)現(xiàn)CGO編程其實(shí)沒有想象的那么難,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Go 互斥鎖和讀寫互斥鎖的實(shí)現(xiàn)

    Go 互斥鎖和讀寫互斥鎖的實(shí)現(xiàn)

    本文主要介紹了Go 互斥鎖和讀寫互斥鎖的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • 深入分析golang多值返回以及閉包的實(shí)現(xiàn)

    深入分析golang多值返回以及閉包的實(shí)現(xiàn)

    相對于C/C++,golang有很多新穎的特性,例如goroutine,channel等等,這些特性其實(shí)從golang源碼是可以理解其實(shí)現(xiàn)的原理。今天這篇文章主要來分析下golang多值返回以及閉包的實(shí)現(xiàn),因?yàn)檫@兩個(gè)實(shí)現(xiàn)golang源碼中并不存在,我們必須從匯編的角度來窺探二者的實(shí)現(xiàn)。
    2016-09-09
  • 構(gòu)建go鏡像實(shí)現(xiàn)過程全面講解

    構(gòu)建go鏡像實(shí)現(xiàn)過程全面講解

    這篇文章主要為大家介紹了構(gòu)建go鏡像實(shí)現(xiàn)過程全面講解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • 簡介Go語言中的select語句的用法

    簡介Go語言中的select語句的用法

    這篇文章主要介紹了簡介Go語言中的select語句的用法,是golang入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-10-10
  • 深入了解Go的HttpClient超時(shí)機(jī)制

    深入了解Go的HttpClient超時(shí)機(jī)制

    在寫?Go?的過程中經(jīng)常對比這Java和GO語言的特性,踩了不少坑,也發(fā)現(xiàn)了不少有意思的地方,今天就來聊聊?Go?自帶的?HttpClient?的超時(shí)機(jī)制
    2022-11-11
  • 一文帶你入門Go語言中定時(shí)任務(wù)庫Cron的使用

    一文帶你入門Go語言中定時(shí)任務(wù)庫Cron的使用

    在平時(shí)的開發(fā)需求中,我們經(jīng)常會(huì)有一些重復(fù)執(zhí)行的操作需要觸發(fā)執(zhí)行,說白了就是定時(shí)任務(wù)。這篇文章主要給大家介紹一下如何在go項(xiàng)目中實(shí)現(xiàn)一個(gè)crontab功能,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助
    2022-08-08
  • Go使用協(xié)程批量獲取數(shù)據(jù)加快接口返回速度

    Go使用協(xié)程批量獲取數(shù)據(jù)加快接口返回速度

    這篇文章主要介紹了Go使用協(xié)程批量獲取數(shù)據(jù)加快接口返回速度,使用Go語言后,可以并發(fā)獲取,極大提升效率,需要的朋友可以參考下
    2023-02-02
  • Golang結(jié)構(gòu)化日志包log/slog的使用詳解

    Golang結(jié)構(gòu)化日志包log/slog的使用詳解

    官方提供的用于打印日志的包是標(biāo)準(zhǔn)庫中的 log 包,該包雖然被廣泛使用,但是缺點(diǎn)也很多,所以Go 1.21新增的 log/slog 完美解決了以上問題,下面我們就來看看log/slog包的使用吧
    2023-09-09
  • Go routine使用方法講解

    Go routine使用方法講解

    goroutine是Go語言提供的語言級別的輕量級線程,在我們需要使用并發(fā)時(shí),我們只需要通過go關(guān)鍵字來開啟goroutine即可。這篇文章主要介紹了GoLang并發(fā)機(jī)制goroutine原理,感興趣的可以了解一下
    2023-01-01

最新評論