關(guān)于go平滑重啟庫overseer實(shí)現(xiàn)原理詳解
overseer主要完成了三部分功能:
1、連接的無損關(guān)閉,2、連接的平滑重啟,3、文件變更的自動(dòng)重啟。
下面依次講一下:
一、連接的無損關(guān)閉
golang官方的net包是不支持連接的無損關(guān)閉的,當(dāng)主監(jiān)聽協(xié)程退出時(shí),并不會(huì)等待各個(gè)實(shí)際work協(xié)程的處理完成。
以下是golang官方代碼:
Go/src/net/http/server.go
func (srv *Server) Serve(l net.Listener) error {
if fn := testHookServerServe; fn != nil {
fn(srv, l) // call hook with unwrapped listener
}
origListener := l
l = &onceCloseListener{Listener: l}
defer l.Close()
if err := srv.setupHTTP2_Serve(); err != nil {
return err
}
if !srv.trackListener(&l, true) {
return ErrServerClosed
}
defer srv.trackListener(&l, false)
baseCtx := context.Background()
if srv.BaseContext != nil {
baseCtx = srv.BaseContext(origListener)
if baseCtx == nil {
panic("BaseContext returned a nil context")
}
}
var tempDelay time.Duration // how long to sleep on accept failure
ctx := context.WithValue(baseCtx, ServerContextKey, srv)
for {
rw, err := l.Accept()
if err != nil {
if srv.shuttingDown() {
return ErrServerClosed
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
time.Sleep(tempDelay)
continue
}
return err
}
connCtx := ctx
if cc := srv.ConnContext; cc != nil {
connCtx = cc(connCtx, rw)
if connCtx == nil {
panic("ConnContext returned nil")
}
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew, runHooks) // before Serve can return
go c.serve(connCtx)
}
}當(dāng)監(jiān)聽套接字關(guān)閉,l.Accept()退出循環(huán)時(shí),并不會(huì)等待go c.serve(connCtx)協(xié)程的處理完成。
overseer的處理方式是,包裝了golang的監(jiān)聽套接字和連接套接字,通過sync.WaitGroup提供了對(duì)主協(xié)程異步等待work協(xié)程處理完成的支持。
overseer代碼如下:
overseer-v1.1.6\graceful.go
func (l *overseerListener) Accept() (net.Conn, error) {
conn, err := l.Listener.(*net.TCPListener).AcceptTCP()
if err != nil {
return nil, err
}
conn.SetKeepAlive(true) // see http.tcpKeepAliveListener
conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
uconn := overseerConn{
Conn: conn,
wg: &l.wg,
closed: make(chan bool),
}
go func() {
//connection watcher
select {
case <-l.closeByForce:
uconn.Close()
case <-uconn.closed:
//closed manually
}
}()
l.wg.Add(1)
return uconn, nil
}
//non-blocking trigger close
func (l *overseerListener) release(timeout time.Duration) {
//stop accepting connections - release fd
l.closeError = l.Listener.Close()
//start timer, close by force if deadline not met
waited := make(chan bool)
go func() {
l.wg.Wait()
waited <- true
}()
go func() {
select {
case <-time.After(timeout):
close(l.closeByForce)
case <-waited:
//no need to force close
}
}()
}
//blocking wait for close
func (l *overseerListener) Close() error {
l.wg.Wait()
return l.closeError
}
func (o overseerConn) Close() error {
err := o.Conn.Close()
if err == nil {
o.wg.Done()
o.closed <- true
}
return err
}在(l *overseerListener) Accept函數(shù)中,每生成一個(gè)work連接,執(zhí)行l(wèi).wg.Add(1),在(o overseerConn) Close函數(shù)中,每關(guān)閉一個(gè)work連接,執(zhí)行o.wg.Done()。
在異步關(guān)閉模式(l *overseerListener) release函數(shù)中和在同步關(guān)閉模式(l *overseerListener) Close函數(shù)中都會(huì)調(diào)用l.wg.Wait()以等待work協(xié)程的處理完成。
監(jiān)聽套接字關(guān)閉流程:
1、work進(jìn)程收到重啟信號(hào),或者master進(jìn)程收到重啟信號(hào)然后轉(zhuǎn)發(fā)到work進(jìn)程。
2、work進(jìn)程的信號(hào)處理里包含對(duì)(l *overseerListener) release的調(diào)用。
3、在(l *overseerListener) release里關(guān)閉監(jiān)聽套接字,并異步l.wg.Wait()。
4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出錯(cuò)返回,退出監(jiān)聽循環(huán),然后執(zhí)行defer l.Close(),即(l *overseerListener) Close。
5、在(l *overseerListener) Close里同步執(zhí)行l(wèi).wg.Wait(),等待work連接處理完成。
6、work連接處理完成時(shí),會(huì)調(diào)用(o overseerConn) Close(),進(jìn)而調(diào)用o.wg.Done()。
7、所有work連接處理完成后,向master進(jìn)程發(fā)送SIGUSR1信號(hào)。
8、master進(jìn)程收到SIGUSR1信號(hào)后,將true寫入mp.descriptorsReleased管道。
9、master進(jìn)程的(mp *master) fork里,收到mp.descriptorsReleased后,結(jié)束本次fork,進(jìn)入下一次fork。
二、連接的平滑重啟
所謂平滑重啟,就是重啟不會(huì)造成客戶端的斷連,對(duì)客戶端無感知,比如原有的排隊(duì)連接不會(huì)被丟棄,所以監(jiān)聽套接字通過master進(jìn)程在新舊work進(jìn)程間傳遞,而不是新啟的work進(jìn)程重新創(chuàng)建監(jiān)聽連接。
監(jiān)聽套接字由master進(jìn)程創(chuàng)建:
overseer-v1.1.6/proc_master.go
func (mp *master) retreiveFileDescriptors() error {
mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses))
for i, addr := range mp.Config.Addresses {
a, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return fmt.Errorf("Invalid address %s (%s)", addr, err)
}
l, err := net.ListenTCP("tcp", a)
if err != nil {
return err
}
f, err := l.File()
if err != nil {
return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err)
}
if err := l.Close(); err != nil {
return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err)
}
mp.slaveExtraFiles[i] = f
}
return nil
}從mp.Config.Addresses中拿到地址,建立監(jiān)聽連接,最后把文件句柄存入mp.slaveExtraFiles。
在這個(gè)過程中調(diào)用了(l *TCPListener) Close,但其實(shí)對(duì)work進(jìn)程無影響,影響的只是master進(jìn)程自己不能讀寫監(jiān)聽套接字。
這里引用下對(duì)網(wǎng)絡(luò)套接字close和shutdown的區(qū)別:
close ---- 關(guān)閉本進(jìn)程的socket id,但連接還是開著的,用這個(gè)socket id的其它進(jìn)程還能用這個(gè)連接,能讀或?qū)戇@個(gè)socket id。
shutdown ---- 則破壞了socket 連接,讀的時(shí)候可能偵探到EOF結(jié)束符,寫的時(shí)候可能會(huì)收到一個(gè)SIGPIPE信號(hào),這個(gè)信號(hào)可能直到socket buffer被填充了才收到,shutdown還有一個(gè)關(guān)閉方式的參數(shù),0 不能再讀,1不能再寫,2 讀寫都不能。
將mp.slaveExtraFiles傳遞給子進(jìn)程即work進(jìn)程:
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error {
mp.debugf("starting %s", mp.binPath)
cmd := exec.Command(mp.binPath)
//mark this new process as the "active" slave process.
//this process is assumed to be holding the socket files.
mp.slaveCmd = cmd
mp.slaveID++
//provide the slave process with some state
e := os.Environ()
e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash))
e = append(e, envBinPath+"="+mp.binPath)
e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID))
e = append(e, envIsSlave+"=1")
e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles)))
cmd.Env = e
//inherit master args/stdfiles
cmd.Args = os.Args
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
//include socket files
cmd.ExtraFiles = mp.slaveExtraFiles
if err := cmd.Start(); err != nil {
return fmt.Errorf("Failed to start slave process: %s", err)
}
//was scheduled to restart, notify success
if mp.restarting {
mp.restartedAt = time.Now()
mp.restarting = false
mp.restarted <- true
}
//convert wait into channel
cmdwait := make(chan error)
go func() {
cmdwait <- cmd.Wait()
}()
//wait....
select {
case err := <-cmdwait:
//program exited before releasing descriptors
//proxy exit code out to master
code := 0
if err != nil {
code = 1
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
code = status.ExitStatus()
}
}
}
mp.debugf("prog exited with %d", code)
//if a restarts are disabled or if it was an
//unexpected crash, proxy this exit straight
//through to the main process
if mp.NoRestart || !mp.restarting {
os.Exit(code)
}
case <-mp.descriptorsReleased:
//if descriptors are released, the program
//has yielded control of its sockets and
//a parallel instance of the program can be
//started safely. it should serve state.Listeners
//to ensure downtime is kept at <1sec. The previous
//cmd.Wait() will still be consumed though the
//result will be discarded.
}
return nil
}通過cmd.ExtraFiles = mp.slaveExtraFiles語句向子進(jìn)程傳遞套接字,這個(gè)參數(shù)最終傳遞給fork系統(tǒng)調(diào)用,傳遞的fd會(huì)被子進(jìn)程繼承。
子進(jìn)程即work進(jìn)程處理繼承的套接字:
overseer-v1.1.6/proc_slave.go
func (sp *slave) run() error {
sp.id = os.Getenv(envSlaveID)
sp.debugf("run")
sp.state.Enabled = true
sp.state.ID = os.Getenv(envBinID)
sp.state.StartedAt = time.Now()
sp.state.Address = sp.Config.Address
sp.state.Addresses = sp.Config.Addresses
sp.state.GracefulShutdown = make(chan bool, 1)
sp.state.BinPath = os.Getenv(envBinPath)
if err := sp.watchParent(); err != nil {
return err
}
if err := sp.initFileDescriptors(); err != nil {
return err
}
sp.watchSignal()
//run program with state
sp.debugf("start program")
sp.Config.Program(sp.state)
return nil
}
func (sp *slave) initFileDescriptors() error {
//inspect file descriptors
numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
if err != nil {
return fmt.Errorf("invalid %s integer", envNumFDs)
}
sp.listeners = make([]*overseerListener, numFDs)
sp.state.Listeners = make([]net.Listener, numFDs)
for i := 0; i < numFDs; i++ {
f := os.NewFile(uintptr(3+i), "")
l, err := net.FileListener(f)
if err != nil {
return fmt.Errorf("failed to inherit file descriptor: %d", i)
}
u := newOverseerListener(l)
sp.listeners[i] = u
sp.state.Listeners[i] = u
}
if len(sp.state.Listeners) > 0 {
sp.state.Listener = sp.state.Listeners[0]
}
return nil
}子進(jìn)程只是重新包裝套接字,并沒有新建監(jiān)聽連接,包裝成u := newOverseerListener(l)類型,這些監(jiān)聽套接字最后傳遞給sp.Config.Program(sp.state),即用戶的啟動(dòng)程序:
overseer-v1.1.6/example/main.go
// convert your 'main()' into a 'prog(state)'
// 'prog()' is run in a child process
func prog(state overseer.State) {
fmt.Printf("app#%s (%s) listening...\n", BuildID, state.ID)
http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d, _ := time.ParseDuration(r.URL.Query().Get("d"))
time.Sleep(d)
fmt.Fprintf(w, "app#%s (%s) %v says hello\n", BuildID, state.ID, state.StartedAt)
}))
http.Serve(state.Listener, nil)
fmt.Printf("app#%s (%s) exiting...\n", BuildID, state.ID)
}
// then create another 'main' which runs the upgrades
// 'main()' is run in the initial process
func main() {
overseer.Run(overseer.Config{
Program: prog,
Address: ":5001",
Fetcher: &fetcher.File{Path: "my_app_next"},
Debug: true, //display log of overseer actions
TerminateTimeout: 10 * time.Minute,
})
}在用戶程序中http.Serve(state.Listener, nil)調(diào)用:
1、使用的accept方式是包裝后的(l *overseerListener) Accept()。
2、defer l.Close()使用也是包裝后的(l *overseerListener) Close()。
3、由(l *overseerListener) Accept()創(chuàng)建的work連接也都包裝成了overseerConn連接,在關(guān)閉時(shí)會(huì)調(diào)用(o overseerConn) Close()
三、文件變更的自動(dòng)重啟
能夠自動(dòng)監(jiān)視文件變化,有變更時(shí)自動(dòng)觸發(fā)重啟流程。
在master進(jìn)程啟動(dòng)時(shí)檢查配置,如果設(shè)置了mp.Config.Fetcher則進(jìn)入fetchLoop:
overseer-v1.1.6/proc_master.go
// fetchLoop is run in a goroutine
func (mp *master) fetchLoop() {
min := mp.Config.MinFetchInterval
time.Sleep(min)
for {
t0 := time.Now()
mp.fetch()
//duration fetch of fetch
diff := time.Now().Sub(t0)
if diff < min {
delay := min - diff
//ensures at least MinFetchInterval delay.
//should be throttled by the fetcher!
time.Sleep(delay)
}
}
}mp.Config.MinFetchInterval默認(rèn)是1秒,也就是每秒檢查一次變更。time.Duration類型,可以設(shè)置更小的粒度。
已經(jīng)支持的fetcher包括:fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。
以fetcher_file.go為例說明。
1、文件變更的判斷:
overseer-v1.1.6/proc_master.go
//tee off to sha1
hash := sha1.New()
reader = io.TeeReader(reader, hash)
//write to a temp file
_, err = io.Copy(tmpBin, reader)
if err != nil {
mp.warnf("failed to write temp binary: %s", err)
return
}
//compare hash
newHash := hash.Sum(nil)
if bytes.Equal(mp.binHash, newHash) {
mp.debugf("hash match - skip")
return
}通過sha1算法實(shí)現(xiàn),比較新舊hash值,并沒有關(guān)注文件時(shí)間戳。
2、驗(yàn)證是可執(zhí)行文件,且是支持overseer的:
overseer-v1.1.6/proc_master.go
tokenIn := token()
cmd := exec.Command(tmpBinPath)
cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...)
cmd.Args = os.Args
returned := false
go func() {
time.Sleep(5 * time.Second)
if !returned {
mp.warnf("sanity check against fetched executable timed-out, check overseer is running")
if cmd.Process != nil {
cmd.Process.Kill()
}
}
}()
tokenOut, err := cmd.CombinedOutput()
returned = true
if err != nil {
mp.warnf("failed to run temp binary: %s (%s) output \"%s\"", err, tmpBinPath, tokenOut)
return
}
if tokenIn != string(tokenOut) {
mp.warnf("sanity check failed")
return
}這是通過overseer預(yù)埋的代碼實(shí)現(xiàn)的:
overseer-v1.1.6/overseer.go
//sanityCheck returns true if a check was performed
func sanityCheck() bool {
//sanity check
if token := os.Getenv(envBinCheck); token != "" {
fmt.Fprint(os.Stdout, token)
return true
}
//legacy sanity check using old env var
if token := os.Getenv(envBinCheckLegacy); token != "" {
fmt.Fprint(os.Stdout, token)
return true
}
return false
}這段代碼在main啟動(dòng)時(shí)在overseer.Run里會(huì)調(diào)用到,傳遞固定的環(huán)境變量,然后命令行輸出會(huì)原樣顯示出來即為成功。
3、覆蓋舊文件,并觸發(fā)重啟。
overseer-v1.1.6/proc_master.go
//overwrite!
if err := overwrite(mp.binPath, tmpBinPath); err != nil {
mp.warnf("failed to overwrite binary: %s", err)
return
}
mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12])
mp.binHash = newHash
//binary successfully replaced
if !mp.Config.NoRestartAfterFetch {
mp.triggerRestart()
}由(mp *master) triggerRestart進(jìn)入重啟流程:
overseer-v1.1.6/proc_master.go
func (mp *master) triggerRestart() {
if mp.restarting {
mp.debugf("already graceful restarting")
return //skip
} else if mp.slaveCmd == nil || mp.restarting {
mp.debugf("no slave process")
return //skip
}
mp.debugf("graceful restart triggered")
mp.restarting = true
mp.awaitingUSR1 = true
mp.signalledAt = time.Now()
mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate
select {
case <-mp.restarted:
//success
mp.debugf("restart success")
case <-time.After(mp.TerminateTimeout):
//times up mr. process, we did ask nicely!
mp.debugf("graceful timeout, forcing exit")
mp.sendSignal(os.Kill)
}
}向子進(jìn)程發(fā)送mp.Config.RestartSignal信號(hào),子進(jìn)程收到信號(hào)后,關(guān)閉監(jiān)聽套接字然后向父進(jìn)程發(fā)送SIGUSR1信號(hào):
overseer-v1.1.6/proc_slave.go
if len(sp.listeners) > 0 {
//perform graceful shutdown
for _, l := range sp.listeners {
l.release(sp.Config.TerminateTimeout)
}
//signal release of held sockets, allows master to start
//a new process before this child has actually exited.
//early restarts not supported with restarts disabled.
if !sp.NoRestart {
sp.masterProc.Signal(SIGUSR1)
}
//listeners should be waiting on connections to close...
}父進(jìn)程收到SIGUSR1信號(hào)后,通知mp.descriptorsReleased管道監(jiān)聽套接字已經(jīng)關(guān)閉:
overseer-v1.1.6/proc_master.go
//**during a restart** a SIGUSR1 signals
//to the master process that, the file
//descriptors have been released
if mp.awaitingUSR1 && s == SIGUSR1 {
mp.debugf("signaled, sockets ready")
mp.awaitingUSR1 = false
mp.descriptorsReleased <- true
} else最終回到(mp *master) fork函數(shù),fork函數(shù)一直在等待mp.descriptorsReleased通知或者cmd.Wait子進(jìn)程退出,收到管道通知后fork退出,進(jìn)入下一輪fork循環(huán)。
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error {
//... ...
//... ...
//... ...
//convert wait into channel
cmdwait := make(chan error)
go func() {
cmdwait <- cmd.Wait()
}()
//wait....
select {
case err := <-cmdwait:
//program exited before releasing descriptors
//proxy exit code out to master
code := 0
if err != nil {
code = 1
if exiterr, ok := err.(*exec.ExitError); ok {
if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
code = status.ExitStatus()
}
}
}
mp.debugf("prog exited with %d", code)
//if a restarts are disabled or if it was an
//unexpected crash, proxy this exit straight
//through to the main process
if mp.NoRestart || !mp.restarting {
os.Exit(code)
}
case <-mp.descriptorsReleased:
//if descriptors are released, the program
//has yielded control of its sockets and
//a parallel instance of the program can be
//started safely. it should serve state.Listeners
//to ensure downtime is kept at <1sec. The previous
//cmd.Wait() will still be consumed though the
//result will be discarded.
}
return nil
}以上就是關(guān)于go平滑重啟庫overseer實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于go平滑重啟庫overseer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解讀rand.Seed(time.Now().UnixNano())的作用及說明
這篇文章主要介紹了關(guān)于rand.Seed(time.Now().UnixNano())的作用及說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。2023-03-03
Golang調(diào)用FFmpeg轉(zhuǎn)換視頻流的實(shí)現(xiàn)
本文主要介紹了Golang調(diào)用FFmpeg轉(zhuǎn)換視頻流,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02
go開發(fā)alertmanger實(shí)現(xiàn)釘釘報(bào)警
本文主要介紹了go開發(fā)alertmanger實(shí)現(xiàn)釘釘報(bào)警,通過自己的url實(shí)現(xiàn)alertmanager的釘釘報(bào)警,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07
一文詳解GO如何實(shí)現(xiàn)Redis的AOF持久化
這篇文章主要為大家詳細(xì)介紹了GO如何實(shí)現(xiàn)Redis的AOF持久化的,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解一下2023-03-03
Golang使用原生http實(shí)現(xiàn)中間件的代碼詳解
中間件(middleware):常被用來做認(rèn)證校驗(yàn)、審計(jì)等,家常用的Iris、Gin等web框架,都包含了中間件邏輯,但有時(shí)我們引入該框架顯得較為繁重,本文將介紹通過golang原生http來實(shí)現(xiàn)中間件操作,需要的朋友可以參考下2024-05-05
go?分布式鎖簡單實(shí)現(xiàn)實(shí)例詳解
這篇文章主要為大家介紹了go?分布式鎖簡單實(shí)現(xiàn)實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09

