欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

go語言K8S?的?informer機(jī)制淺析

 更新時(shí)間:2022年10月28日 11:46:26   作者:熄燈勿語  
這篇文章為大家主要介紹了go語言K8S?的?informer機(jī)制淺析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

正文

Kubernetes的控制器模式是其非常重要的一個(gè)設(shè)計(jì)模式,整個(gè)Kubernetes定義的資源對(duì)象以及其狀態(tài)都保存在etcd數(shù)據(jù)庫中,通過apiserver對(duì)其進(jìn)行增刪查改,而各種各樣的控制器需要從apiserver及時(shí)獲取這些對(duì)象,然后將其應(yīng)用到實(shí)際中,即將這些對(duì)象的實(shí)際狀態(tài)調(diào)整為期望狀態(tài),讓他們保持匹配。

不過這因?yàn)檫@樣,各種控制器需要和apiserver進(jìn)行頻繁交互,需要能夠及時(shí)獲取對(duì)象狀態(tài)的變化,而如果簡(jiǎn)單的通過暴力輪詢的話,會(huì)給apiserver造成很大的壓力,且效率很低,因此,Kubernetes設(shè)計(jì)了Informer這個(gè)機(jī)制,用來作為控制器跟apiserver交互的橋梁,它主要有兩方面的作用:

依賴Etcd的List&Watch機(jī)制,在本地維護(hù)了一份目標(biāo)對(duì)象的緩存。

Etcd的Watch機(jī)制能夠使客戶端及時(shí)獲知這些對(duì)象的狀態(tài)變化,然后通過List機(jī)制,更新本地緩存,這樣就在客戶端為這些API對(duì)象維護(hù)了一份和Etcd數(shù)據(jù)庫中幾乎一致的數(shù)據(jù),然后控制器等客戶端就可以直接訪問緩存獲取對(duì)象的信息,而不用去直接訪問apiserver,這一方面顯著提高了性能,另一方面則大大降低了對(duì)apiserver的訪問壓力;

依賴Etcd的Watch機(jī)制,觸發(fā)控制器等客戶端注冊(cè)到Informer中的事件方法。

客戶端可能會(huì)對(duì)某些對(duì)象的某些事件感興趣,當(dāng)這些事件發(fā)生時(shí),希望能夠執(zhí)行某些操作,比如通過apiserver新建了一個(gè)pod,那么kube-scheduler中的控制器收到了這個(gè)事件,然后將這個(gè)pod加入到其隊(duì)列中,等待進(jìn)行調(diào)度。

Kubernetes的各個(gè)組件本身就內(nèi)置了非常多的控制器,而自定義的控制器也需要通過Informer跟apiserver進(jìn)行交互,因此,Informer在Kubernetes中應(yīng)用非常廣泛,本篇文章就重點(diǎn)分析下Informer的機(jī)制原理,以加深對(duì)其的理解。

使用方法

先來看看Informer是怎么用的,以Endpoint為例,來看下其使用Informer的相關(guān)代碼:

創(chuàng)建Informer工廠

# client-go/informers/factory.go
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

首先創(chuàng)建了一個(gè)SharedInformerFactory,這個(gè)結(jié)構(gòu)主要有兩個(gè)作用:

  • 一個(gè)是用來作為創(chuàng)建Informer的工廠,典型的工廠模式,在Kubernetes中這種設(shè)計(jì)模式也很常用;
  • 一個(gè)是共享Informer,所謂共享,就是多個(gè)Controller可以共用同一個(gè)Informer,因?yàn)椴煌腃ontroller可能對(duì)同一種API對(duì)象感興趣,這樣相同的API對(duì)象,緩存就只有一份,通知機(jī)制也只有一套,大大提高了效率,減少了資源浪費(fèi)。

創(chuàng)建對(duì)象Informer結(jié)構(gòu)體

# client-go/informers/core/v1/endpoints.go
type EndpointsInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.EndpointsLister
}
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()

使用InformerFactory創(chuàng)建出對(duì)應(yīng)版本的對(duì)象的Informer結(jié)構(gòu)體,如Endpoints對(duì)象對(duì)應(yīng)的就是EndpointsInformer結(jié)構(gòu)體,該結(jié)構(gòu)體實(shí)現(xiàn)了兩個(gè)方法:Informer()和Lister()

  • 前者用來構(gòu)建出最終的Informer,即我們本篇文章的重點(diǎn):SharedIndexInformer,
  • 后者用來獲取創(chuàng)建出來的Informer的緩存接口:Indexer,該接口可以用來查詢緩存的數(shù)據(jù),我準(zhǔn)備下一篇文章單獨(dú)介紹其底層如何實(shí)現(xiàn)緩存的。

注冊(cè)事件方法

# Client-go/tools/cache/shared_informer.go
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    onAdd,
    UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
    DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
  })
func onAdd(obj interface{}) {
  node := obj.(*corev1.Endpoint)
  fmt.Println("add a endpoint:", endpoint.Name)
}

這里,首先調(diào)用Infomer()創(chuàng)建出來SharedIndexInformer,然后向其中注冊(cè)事件方法,這樣當(dāng)有對(duì)應(yīng)的事件發(fā)生時(shí),就會(huì)觸發(fā)這里注冊(cè)的方法去做相應(yīng)的事情。其次調(diào)用Lister()獲取到緩存接口,就可以通過它來查詢Informer中緩存的數(shù)據(jù)了,而且Informer中緩存的數(shù)據(jù),是可以有索引的,這樣可以加快查詢的速度。

啟動(dòng)Informer

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
controllerContext.InformerFactory.Start(controllerContext.Stop)

這里InformerFactory的啟動(dòng),會(huì)遍歷Factory中創(chuàng)建的所有Informer,依次將其啟動(dòng)。

機(jī)制解析

Informer的實(shí)現(xiàn)都是在client-go這個(gè)庫中,通過上述的工廠方法,其實(shí)最終創(chuàng)建出來的是一個(gè)叫做SharedIndexInformer的結(jié)構(gòu)體:

# k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller
    processor             *sharedProcessor
    cacheMutationDetector MutationDetector
    listerWatcher ListerWatcher
    ......
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

可以看到,在創(chuàng)建SharedIndexInformer時(shí),就創(chuàng)建出了processor, indexer等結(jié)構(gòu),而在Informer啟動(dòng)時(shí),還創(chuàng)建出了controller, fifo queue, reflector等結(jié)構(gòu)。

Reflector

Reflector的作用,就是通過List&Watch的方式,從apiserver獲取到感興趣的對(duì)象以及其狀態(tài),然后將其放到一個(gè)稱為”Delta”的先進(jìn)先出隊(duì)列中。

所謂的Delta FIFO Queue,就是隊(duì)列中的元素除了對(duì)象本身外,還有針對(duì)該對(duì)象的事件類型:

type Delta struct {
    Type   DeltaType
    Object interface{}
}

目前有5種Type: Added, Updated, Deleted, Replaced, Resync,所以,針對(duì)同一個(gè)對(duì)象,可能有多個(gè)Delta元素在隊(duì)列中,表示對(duì)該對(duì)象做了不同的操作,比如短時(shí)間內(nèi),多次對(duì)某一個(gè)對(duì)象進(jìn)行了更新操作,那么就會(huì)有多個(gè)Updated類型的Delta放入到隊(duì)列中。后續(xù)隊(duì)列的消費(fèi)者,可以根據(jù)這些Delta的類型,來回調(diào)注冊(cè)到Informer中的事件方法。

而所謂的List&Watch,就是

  • 先調(diào)用該API對(duì)象的List接口,獲取到對(duì)象列表,將它們添加到隊(duì)列中,Delta元素類型為Replaced,
  • 然后再調(diào)用Watch接口,持續(xù)監(jiān)聽該API對(duì)象的狀態(tài)變化事件,將這些事件按照不同的事件類型,組成對(duì)應(yīng)的Delta類型,添加到隊(duì)列中,Delta元素類型有Added, Updated, Deleted三種。

此外,Informer還會(huì)周期性的發(fā)送Resync類型的Delta元素到隊(duì)列中,目的是為了周期性的觸發(fā)注冊(cè)到Informer中的事件方法UpdateFunc,保證對(duì)象的期望狀態(tài)和實(shí)際狀態(tài)一致,該周期是由一個(gè)叫做resyncPeriod的參數(shù)決定的,在向Informer中添加EventHandler時(shí),可以指定該參數(shù),若為0的話,則關(guān)閉該功能。需要注意的是,Resync類型的Delta元素中的對(duì)象,是通過Indexer從緩存中獲取到的,而不是直接從apiserver中拿的,即這里resync的,其實(shí)是”緩存”的對(duì)象的期望狀態(tài)和實(shí)際狀態(tài)的一致性。

根據(jù)以上Reflector的機(jī)制,依賴Etcd的Watch機(jī)制,通過事件來獲知對(duì)象變化狀態(tài),建立本地緩存。即使在Informer中,也沒有周期性的調(diào)用對(duì)象的List接口,正常情況下,List&Watch只會(huì)執(zhí)行一次,即先執(zhí)行List把數(shù)據(jù)拉過來,放入隊(duì)列中,后續(xù)就進(jìn)入Watch階段。

那什么時(shí)候才會(huì)再執(zhí)行List呢?其實(shí)就是異常的時(shí)候,在List或者Watch的過程中,如果有異常,比如apiserver重啟了,那么Reflector就開始周期性的執(zhí)行List&Watch,直到再次正常進(jìn)入Watch階段。為了在異常時(shí)段,不給apiserver造成壓力,這個(gè)周期是一個(gè)稱為backoff的可變的時(shí)間間隔,默認(rèn)是一個(gè)指數(shù)型的間隔,即越往后重試的間隔越長,到一定時(shí)間又會(huì)重置回一開始的頻率。而且,為了讓不同的apiserver能夠均勻負(fù)載這些Watch請(qǐng)求,客戶端會(huì)主動(dòng)斷開跟apiserver的連接,這個(gè)超時(shí)時(shí)間為60秒,然后重新發(fā)起Watch請(qǐng)求。此外,在控制器重啟過程中,也會(huì)再次執(zhí)行List,所以會(huì)觀察到之前已經(jīng)創(chuàng)建好的API對(duì)象,又重新觸發(fā)了一遍AddFunc方法。

從以上這些點(diǎn),可以看出來,Kubernetes在性能和穩(wěn)定性的提升上,還是下了很多功夫的。

Controller

這里Controller的作用是通過輪詢不斷從隊(duì)列中取出Delta元素,根據(jù)元素的類型,一方面通過Indexer更新本地的緩存,一方面調(diào)用Processor來觸發(fā)注冊(cè)到Informer的事件方法:

# k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    }
}

這里的c.config.Process是定義在shared_informer.go中的HandleDeltas()方法:

# k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                isSync := false
                switch {
                case d.Type == Sync:
                    // Sync events are only propagated to listeners that requested resync
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                            // Replaced events that didn't change resourceVersion are treated as resync events
                            // and only propagated to listeners that requested resync
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

Processer & Listener

Processer和Listener則是觸發(fā)事件方法的機(jī)制,在創(chuàng)建Informer時(shí),會(huì)創(chuàng)建一個(gè)Processer,而在向Informer中通過調(diào)用AddEventHandler()注冊(cè)事件方法時(shí),會(huì)為每一個(gè)Handler生成一個(gè)Listener,然后將該Lisener中添加到Processer中,每一個(gè)Listener中有兩個(gè)channel:addCh和nextCh。Listener通過select監(jiān)聽在這兩個(gè)channel上,當(dāng)Controller從隊(duì)列中取出新的元素時(shí),會(huì)調(diào)用processer來給它的listener發(fā)送“通知”,這個(gè)“通知”就是向addCh中添加一個(gè)元素,即add(),然后一個(gè)goroutine就會(huì)將這個(gè)元素從addCh轉(zhuǎn)移到nextCh,即pop(),從而觸發(fā)另一個(gè)goroutine執(zhí)行注冊(cè)的事件方法,即run()。

# k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}
func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop
    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
                p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.OnAdd(notification.newObj)
            case deleteNotification:
                p.handler.OnDelete(notification.oldObj)
            default:
                utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
        }
        // the only way to get here is if the p.nextCh is empty and closed
        close(stopCh)
    }, 1*time.Second, stopCh)
}

Indexer

Indexer是對(duì)緩存進(jìn)行增刪查改的接口,緩存本質(zhì)上就是用map構(gòu)建的key:value鍵值對(duì),都存在items這個(gè)map中,key為/:

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}
    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

而為了加速查詢,還可以選擇性的給這些緩存添加索引,索引存儲(chǔ)在indecies中,所謂索引,就是在向緩存中添加記錄時(shí),就將其key添加到索引結(jié)構(gòu)中,在查找時(shí),可以根據(jù)索引條件,快速查找到指定的key記錄,比如默認(rèn)有個(gè)索引是按照namespace進(jìn)行索引,可以根據(jù)快速找出屬于某個(gè)namespace的某種對(duì)象,而不用去遍歷所有的緩存。

Indexer對(duì)外提供了Replace(), Resync(), Add(), Update(), Delete(), List(), Get(), GetByKey(), ByIndex()等接口。

總結(jié)

本篇對(duì)Kubernetes Informer的使用方法和實(shí)現(xiàn)原理,進(jìn)行了深入分析,整體上看,Informer的設(shè)計(jì)是相當(dāng)不錯(cuò)的,基于事件機(jī)制,一方面構(gòu)建本地緩存,一方面觸發(fā)事件方法,使得控制器能夠快速響應(yīng)和快速獲取數(shù)據(jù),此外,還有諸如共享Informer, resync, index, watch timeout等機(jī)制,使得Informer更加高效和穩(wěn)定,有了Informer,控制器模式可以說是如虎添翼。

以上就是go語言K8S 的 informer機(jī)制淺析的詳細(xì)內(nèi)容,更多關(guān)于go K8S informer機(jī)制淺析的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • golang 設(shè)置web請(qǐng)求狀態(tài)碼操作

    golang 設(shè)置web請(qǐng)求狀態(tài)碼操作

    這篇文章主要介紹了golang 設(shè)置web請(qǐng)求狀態(tài)碼操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go?中的空白標(biāo)識(shí)符下劃線

    Go?中的空白標(biāo)識(shí)符下劃線

    這篇文章主要介紹了Go?中的空白標(biāo)識(shí)符下劃線,空白標(biāo)識(shí)符是未使用的值的占位符,由下劃線(_)表示,下文對(duì)其相關(guān)介紹需要的小伙伴可以參考一下
    2022-03-03
  • Golang使用JWT進(jìn)行認(rèn)證和加密的示例詳解

    Golang使用JWT進(jìn)行認(rèn)證和加密的示例詳解

    JWT是一個(gè)簽名的JSON對(duì)象,通常用作Oauth2的Bearer?token,JWT包括三個(gè)用.分割的部分。本文將利用JWT進(jìn)行認(rèn)證和加密,感興趣的可以了解一下
    2023-02-02
  • go語言寫的簡(jiǎn)要數(shù)據(jù)同步工具詳解

    go語言寫的簡(jiǎn)要數(shù)據(jù)同步工具詳解

    作為go-etl工具的作者,想要安利一下這個(gè)小巧的數(shù)據(jù)同步工具,它在同步百萬級(jí)別的數(shù)據(jù)時(shí)表現(xiàn)極為優(yōu)異,基本能在幾分鐘完成數(shù)據(jù)同步,這篇文章主要介紹了go語言寫的簡(jiǎn)要數(shù)據(jù)同步工具,需要的朋友可以參考下
    2024-07-07
  • gORM操作MySQL的實(shí)現(xiàn)

    gORM操作MySQL的實(shí)現(xiàn)

    本文主要介紹了gORM操作MySQL的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-07-07
  • Go語言使用AES加密解密的示例代碼

    Go語言使用AES加密解密的示例代碼

    這篇文章主要介紹了Go語言使用AES加密解密的示例代碼,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-09-09
  • go中string、int、float相互轉(zhuǎn)換方式

    go中string、int、float相互轉(zhuǎn)換方式

    這篇文章主要介紹了go中string、int、float相互轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • Golang中interface轉(zhuǎn)string輸出打印方法

    Golang中interface轉(zhuǎn)string輸出打印方法

    這篇文章主要給大家介紹了關(guān)于Golang中interface轉(zhuǎn)string輸出打印的相關(guān)資料,在go語言中interface轉(zhuǎn)string可以直接使用fmt提供的fmt函數(shù),文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-02-02
  • go帶緩沖chan實(shí)現(xiàn)消息隊(duì)列功能

    go帶緩沖chan實(shí)現(xiàn)消息隊(duì)列功能

    本文主要介紹了go帶緩沖chan實(shí)現(xiàn)消息隊(duì)列功能,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • Golang中的自定義函數(shù)詳解

    Golang中的自定義函數(shù)詳解

    函數(shù)構(gòu)成代碼執(zhí)行的邏輯結(jié)構(gòu)。在Go語言中,函數(shù)的基本組成為:關(guān)鍵字func、函數(shù)名、參數(shù)列表、返回值、函數(shù)體和返回語句。
    2018-10-10

最新評(píng)論