Go語言k8s?kubernetes使用leader?election實現(xiàn)選舉
一、背景
在kubernetes的世界中,很多組件僅僅需要一個實例在運行,比如controller-manager或第三方的controller,但是為了高可用性,需要組件有多個副本,在發(fā)生故障的時候需要自動切換。因此,需要利用leader election的機制多副本部署,單實例運行的模式。應用程序可以使用外部的組件比如ZooKeeper或Etcd等中間件進行l(wèi)eader eleaction, ZooKeeper的實現(xiàn)是采用臨時節(jié)點的方案,臨時節(jié)點存活與客戶端與ZooKeeper的會話期間,在會話結束后,臨時節(jié)點會被立刻刪除,臨時節(jié)點被刪除后,其他處于被動狀態(tài)的服務實例會競爭生成臨時節(jié)點,生成臨時節(jié)點的客戶端(服務實例)就變成Leader,從而保證整個集群中只有一個活躍的實例,在發(fā)生故障的時候,也能快速的實現(xiàn)主從之間的遷移。Etcd是一個分布式的kv存儲組件,利用Raft協(xié)議維護副本的狀態(tài)服務,Etcd的Revision機制可以實現(xiàn)分布式鎖的功能,Etcd的concurrency利用的分布式鎖的能力實現(xiàn)了選Leader的功能(本文更多關注的是k8s本身的能力,Etcd的concurrency機制不做詳細介紹)。
kubernetes使用的Etcd作為底層的存儲組件,因此我們是不是有可能利用kubernetes的API實現(xiàn)選leader的功能呢?其實kubernetes的SIG已經(jīng)提供了這方面的能力,主要是通過configmap/lease/endpoint的資源實現(xiàn)選Leader的功能。
二、官網(wǎng)代碼示例
kubernetes官方提供了一個使用的例子,源碼在:github.com/kubernetes/…
選舉的過程中,每個實例的狀態(tài)有可能是:
- 選擇成功->運行業(yè)務代碼
- 等待狀態(tài),有其他實例成為了leader。當leader放棄鎖后,此狀態(tài)的實例有可能會成為新的leader
- 釋放leader的鎖,在運行的業(yè)務代碼退出
在穩(wěn)定的環(huán)境中,實例一旦成為了leader,通常情況是不會釋放鎖的,會保持一直運行的狀態(tài),這樣有利于業(yè)務的穩(wěn)定和Controller快速的對資源的狀態(tài)變化做成相應的操作。只有在網(wǎng)絡不穩(wěn)定或誤操作刪除實例的情況下,才會觸發(fā)leader的重新選舉。
kubernetes官方提供的選舉例子詳解如下:
package main
import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)
func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
return cfg, nil
}
cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return cfg, nil
}
func main() {
klog.InitFlags(nil)
var kubeconfig string
var leaseLockName string
var leaseLockNamespace string
var id string
// kubeconfig 指定了kubernetes集群的配置文文件路徑
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
// 鎖的擁有者的ID,如果沒有傳參數(shù)進來,就隨機生成一個
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
// 鎖的ID,對應kubernetes中資源的name
flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
// 鎖的命名空間
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
// 解析命令行參數(shù)
flag.Parse()
if leaseLockName == "" {
klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
}
if leaseLockNamespace == "" {
klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
}
// leader election uses the Kubernetes API by writing to a
// lock object, which can be a LeaseLock object (preferred),
// a ConfigMap, or an Endpoints (deprecated) object.
// Conflicting writes are detected and each client handles those actions
// independently.
config, err := buildConfig(kubeconfig)
if err != nil {
klog.Fatal(err)
}
// 獲取kubernetes集群的客戶端,如果獲取不到,就拋異常退出
client := clientset.NewForConfigOrDie(config)
// 模擬Controller的邏輯代碼
run := func(ctx context.Context) {
// complete your controller loop here
klog.Info("Controller loop...")
// 不退出
select {}
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
// 處理系統(tǒng)的系統(tǒng),收到SIGTERM信號后,會退出進程
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
// 根據(jù)參數(shù),生成鎖。這里使用的Lease這種類型資源作為鎖
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
// 跟kubernetes集群關聯(lián)起來
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
// start the leader election code loop
// 注意,選舉邏輯啟動時候,會傳入ctx參數(shù),如果ctx對應的cancel函數(shù)被調(diào)用,那么選舉也會結束
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
// 選舉使用的鎖
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
//主動放棄leader,當ctx canceled的時候
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second, // 選舉的任期,60s一個任期,如果在60s后沒有renew,那么leader就會釋放鎖,重新選舉
RenewDeadline: 15 * time.Second, // renew的請求的超時時間
RetryPeriod: 5 * time.Second, // leader獲取到鎖后,renew leadership的間隔。非leader,搶鎖成為leader的間隔(有1.2的jitter因子,詳細看代碼)
// 回調(diào)函數(shù)的注冊
Callbacks: leaderelection.LeaderCallbacks{
// 成為leader的回調(diào)
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
// 運行controller的邏輯
run(ctx)
},
OnStoppedLeading: func() {
// we can do cleanup here
// 退出leader的
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
// 有新的leader當選
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
啟動一個實例,觀察日志輸出和kubernetes集群上的lease資源,啟動命令
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1
可以看到,日志有輸出,id=1的實例獲取到資源了。
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...
在kubernetes的集群上,看到

我們接著啟動一個實例,id=2,日志中輸出
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1
可以看出,id=2的實例,沒有獲取到鎖,并且觀察到id=1的鎖獲取到了實例。接著我們嘗試退出id=1的實例,觀察id=2的實例是否會成為新的leader

三、鎖的實現(xiàn)
kubernets的資源都可以實現(xiàn)Get/Create/Update的操作,因此,理論上所有的資源都可以作為鎖的底層。kubernetes 提供了Lease/Configmap/Endpoint作為鎖的底層。
鎖的狀態(tài)轉移如下:

鎖需要實現(xiàn)以下的接口
type Interface interface {
// Get returns the LeaderElectionRecord
Get(ctx context.Context) (*LeaderElectionRecord, []byte, error)
// Create attempts to create a LeaderElectionRecord
Create(ctx context.Context, ler LeaderElectionRecord) error
// Update will update and existing LeaderElectionRecord
Update(ctx context.Context, ler LeaderElectionRecord) error
// RecordEvent is used to record events
RecordEvent(string)
// Identity will return the locks Identity
Identity() string
// Describe is used to convert details on current resource lock
// into a string
Describe() string
}
理論上,有Get/Create/Update三個方法,就可以實現(xiàn)鎖的機制了。但是,需要保證update和create操作的原子性,這個就是kuberenetes的機制保證了。第二章的官網(wǎng)代碼例子中,leaderelection.RunOrDie使用的RunOrDie接口,其實就是調(diào)用Run接口,而Run接口實現(xiàn)非常簡單:
func (le *LeaderElector) Run(ctx context.Context) {
defer runtime.HandleCrash()
defer func() {
le.config.Callbacks.OnStoppedLeading()
}()
// 獲取鎖,如果沒有獲取到,就一直等待
if !le.acquire(ctx) {
return // ctx signalled done
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 獲取到鎖后,需要調(diào)用回調(diào)函數(shù)中的OnStartedLeading,運行controller的代碼
go le.config.Callbacks.OnStartedLeading(ctx)
// 獲取到鎖后,需要不斷地進行renew操作
le.renew(ctx)
}
LeaderElector關鍵是需要acquire和renew的操作,acquire和renew操作代碼如下:
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
klog.Infof("attempting to acquire leader lease %v...", desc)
// 此接口會阻塞,利用定時的機制,獲取鎖,如果獲取不到一直循環(huán),除非ctx被取消。
wait.JitterUntil(func() {
// 獲取鎖
succeeded = le.tryAcquireOrRenew(ctx)
le.maybeReportTransition()
if !succeeded {
klog.V(4).Infof("failed to acquire lease %v", desc)
return
}
le.config.Lock.RecordEvent("became leader")
le.metrics.leaderOn(le.config.Name)
klog.Infof("successfully acquired lease %v", desc)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}
// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// 循環(huán)renew機制,renew成功,不會返回true,導致Until會不斷循環(huán)
wait.Until(func() {
//RenewDeadline的實現(xiàn)在這里,如果renew超過了RenewDeadline,會導致renew失敗,主退出
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
// renew鎖
return le.tryAcquireOrRenew(timeoutCtx), nil
}, timeoutCtx.Done())
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
klog.V(5).Infof("successfully renewed lease %v", desc)
// renew成功
return
}
le.config.Lock.RecordEvent("stopped leading")
le.metrics.leaderOff(le.config.Name)
klog.Infof("failed to renew lease %v: %v", desc, err)
cancel()
}, le.config.RetryPeriod, ctx.Done())
// if we hold the lease, give it up
if le.config.ReleaseOnCancel {
le.release()
}
}
關鍵的實現(xiàn)在于tryAcquireOrRenew,而tryAcquireOrRenew就是依賴鎖的狀態(tài)轉移機制完成核心邏輯。
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
now := metav1.Now()
leaderElectionRecord := rl.LeaderElectionRecord{
HolderIdentity: le.config.Lock.Identity(),
LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
RenewTime: now,
AcquireTime: now,
}
// 1. obtain or create the ElectionRecord
// 檢查鎖有沒有
oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
if err != nil {
// 沒有鎖的資源,就創(chuàng)建一個
if !errors.IsNotFound(err) {
klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
return false
}
if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
//對外宣稱自己成為了leader
le.setObservedRecord(&leaderElectionRecord)
return true
}
// 2. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
// 這個機制很重要,會如果leader會不斷正常renew這個鎖,oldLeaderElectionRawRecord會一直發(fā)生變化,發(fā)生變化會更新le.observedTime
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
}
// 如果還沒超時并且此實例不是leader(leader是其他實例),那么就直接退出
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
!le.IsLeader() {
klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
return false
}
// 3. We're going to try to update. The leaderElectionRecord is set to it's default
// here. Let's correct it before updating.
// 如果是leader,就更新時間RenewTime,保證其他實例(非主)可以觀察到:主還活著
if le.IsLeader() {
leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
} else {
// 不是leader,那么鎖就發(fā)生了轉移
leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
}
// 更新鎖
// update the lock itself
if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
klog.Errorf("Failed to update lock: %v", err)
return false
}
le.setObservedRecord(&leaderElectionRecord)
return true
}以上就是Go語言 k8s kubernetes 使用leader election的詳細內(nèi)容,更多關于Go k8s leader election選舉的資料請關注腳本之家其它相關文章!
相關文章
golang服務報錯:?write:?broken?pipe的解決方案
在開發(fā)在線客服系統(tǒng)的時候,看到日志里有一些錯誤信息,下面這篇文章主要給大家介紹了關于golang服務報錯:?write:?broken?pipe的解決方案,需要的朋友可以參考下2022-09-09

