關(guān)于go平滑重啟庫overseer實現(xiàn)原理詳解
overseer主要完成了三部分功能:
1、連接的無損關(guān)閉,2、連接的平滑重啟,3、文件變更的自動重啟。
下面依次講一下:
一、連接的無損關(guān)閉
golang官方的net包是不支持連接的無損關(guān)閉的,當(dāng)主監(jiān)聽協(xié)程退出時,并不會等待各個實際work協(xié)程的處理完成。
以下是golang官方代碼:
Go/src/net/http/server.go
func (srv *Server) Serve(l net.Listener) error { if fn := testHookServerServe; fn != nil { fn(srv, l) // call hook with unwrapped listener } origListener := l l = &onceCloseListener{Listener: l} defer l.Close() if err := srv.setupHTTP2_Serve(); err != nil { return err } if !srv.trackListener(&l, true) { return ErrServerClosed } defer srv.trackListener(&l, false) baseCtx := context.Background() if srv.BaseContext != nil { baseCtx = srv.BaseContext(origListener) if baseCtx == nil { panic("BaseContext returned a nil context") } } var tempDelay time.Duration // how long to sleep on accept failure ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, err := l.Accept() if err != nil { if srv.shuttingDown() { return ErrServerClosed } if ne, ok := err.(net.Error); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay) time.Sleep(tempDelay) continue } return err } connCtx := ctx if cc := srv.ConnContext; cc != nil { connCtx = cc(connCtx, rw) if connCtx == nil { panic("ConnContext returned nil") } } tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew, runHooks) // before Serve can return go c.serve(connCtx) } }
當(dāng)監(jiān)聽套接字關(guān)閉,l.Accept()退出循環(huán)時,并不會等待go c.serve(connCtx)協(xié)程的處理完成。
overseer的處理方式是,包裝了golang的監(jiān)聽套接字和連接套接字,通過sync.WaitGroup提供了對主協(xié)程異步等待work協(xié)程處理完成的支持。
overseer代碼如下:
overseer-v1.1.6\graceful.go
func (l *overseerListener) Accept() (net.Conn, error) { conn, err := l.Listener.(*net.TCPListener).AcceptTCP() if err != nil { return nil, err } conn.SetKeepAlive(true) // see http.tcpKeepAliveListener conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener uconn := overseerConn{ Conn: conn, wg: &l.wg, closed: make(chan bool), } go func() { //connection watcher select { case <-l.closeByForce: uconn.Close() case <-uconn.closed: //closed manually } }() l.wg.Add(1) return uconn, nil } //non-blocking trigger close func (l *overseerListener) release(timeout time.Duration) { //stop accepting connections - release fd l.closeError = l.Listener.Close() //start timer, close by force if deadline not met waited := make(chan bool) go func() { l.wg.Wait() waited <- true }() go func() { select { case <-time.After(timeout): close(l.closeByForce) case <-waited: //no need to force close } }() } //blocking wait for close func (l *overseerListener) Close() error { l.wg.Wait() return l.closeError } func (o overseerConn) Close() error { err := o.Conn.Close() if err == nil { o.wg.Done() o.closed <- true } return err }
在(l *overseerListener) Accept函數(shù)中,每生成一個work連接,執(zhí)行l(wèi).wg.Add(1),在(o overseerConn) Close函數(shù)中,每關(guān)閉一個work連接,執(zhí)行o.wg.Done()。
在異步關(guān)閉模式(l *overseerListener) release函數(shù)中和在同步關(guān)閉模式(l *overseerListener) Close函數(shù)中都會調(diào)用l.wg.Wait()以等待work協(xié)程的處理完成。
監(jiān)聽套接字關(guān)閉流程:
1、work進程收到重啟信號,或者master進程收到重啟信號然后轉(zhuǎn)發(fā)到work進程。
2、work進程的信號處理里包含對(l *overseerListener) release的調(diào)用。
3、在(l *overseerListener) release里關(guān)閉監(jiān)聽套接字,并異步l.wg.Wait()。
4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出錯返回,退出監(jiān)聽循環(huán),然后執(zhí)行defer l.Close(),即(l *overseerListener) Close。
5、在(l *overseerListener) Close里同步執(zhí)行l(wèi).wg.Wait(),等待work連接處理完成。
6、work連接處理完成時,會調(diào)用(o overseerConn) Close(),進而調(diào)用o.wg.Done()。
7、所有work連接處理完成后,向master進程發(fā)送SIGUSR1信號。
8、master進程收到SIGUSR1信號后,將true寫入mp.descriptorsReleased管道。
9、master進程的(mp *master) fork里,收到mp.descriptorsReleased后,結(jié)束本次fork,進入下一次fork。
二、連接的平滑重啟
所謂平滑重啟,就是重啟不會造成客戶端的斷連,對客戶端無感知,比如原有的排隊連接不會被丟棄,所以監(jiān)聽套接字通過master進程在新舊work進程間傳遞,而不是新啟的work進程重新創(chuàng)建監(jiān)聽連接。
監(jiān)聽套接字由master進程創(chuàng)建:
overseer-v1.1.6/proc_master.go
func (mp *master) retreiveFileDescriptors() error { mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses)) for i, addr := range mp.Config.Addresses { a, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return fmt.Errorf("Invalid address %s (%s)", addr, err) } l, err := net.ListenTCP("tcp", a) if err != nil { return err } f, err := l.File() if err != nil { return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err) } if err := l.Close(); err != nil { return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err) } mp.slaveExtraFiles[i] = f } return nil }
從mp.Config.Addresses中拿到地址,建立監(jiān)聽連接,最后把文件句柄存入mp.slaveExtraFiles。
在這個過程中調(diào)用了(l *TCPListener) Close,但其實對work進程無影響,影響的只是master進程自己不能讀寫監(jiān)聽套接字。
這里引用下對網(wǎng)絡(luò)套接字close和shutdown的區(qū)別:
close ---- 關(guān)閉本進程的socket id,但連接還是開著的,用這個socket id的其它進程還能用這個連接,能讀或?qū)戇@個socket id。
shutdown ---- 則破壞了socket 連接,讀的時候可能偵探到EOF結(jié)束符,寫的時候可能會收到一個SIGPIPE信號,這個信號可能直到socket buffer被填充了才收到,shutdown還有一個關(guān)閉方式的參數(shù),0 不能再讀,1不能再寫,2 讀寫都不能。
將mp.slaveExtraFiles傳遞給子進程即work進程:
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { mp.debugf("starting %s", mp.binPath) cmd := exec.Command(mp.binPath) //mark this new process as the "active" slave process. //this process is assumed to be holding the socket files. mp.slaveCmd = cmd mp.slaveID++ //provide the slave process with some state e := os.Environ() e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash)) e = append(e, envBinPath+"="+mp.binPath) e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID)) e = append(e, envIsSlave+"=1") e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles))) cmd.Env = e //inherit master args/stdfiles cmd.Args = os.Args cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr //include socket files cmd.ExtraFiles = mp.slaveExtraFiles if err := cmd.Start(); err != nil { return fmt.Errorf("Failed to start slave process: %s", err) } //was scheduled to restart, notify success if mp.restarting { mp.restartedAt = time.Now() mp.restarting = false mp.restarted <- true } //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code = 1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
通過cmd.ExtraFiles = mp.slaveExtraFiles語句向子進程傳遞套接字,這個參數(shù)最終傳遞給fork系統(tǒng)調(diào)用,傳遞的fd會被子進程繼承。
子進程即work進程處理繼承的套接字:
overseer-v1.1.6/proc_slave.go
func (sp *slave) run() error { sp.id = os.Getenv(envSlaveID) sp.debugf("run") sp.state.Enabled = true sp.state.ID = os.Getenv(envBinID) sp.state.StartedAt = time.Now() sp.state.Address = sp.Config.Address sp.state.Addresses = sp.Config.Addresses sp.state.GracefulShutdown = make(chan bool, 1) sp.state.BinPath = os.Getenv(envBinPath) if err := sp.watchParent(); err != nil { return err } if err := sp.initFileDescriptors(); err != nil { return err } sp.watchSignal() //run program with state sp.debugf("start program") sp.Config.Program(sp.state) return nil } func (sp *slave) initFileDescriptors() error { //inspect file descriptors numFDs, err := strconv.Atoi(os.Getenv(envNumFDs)) if err != nil { return fmt.Errorf("invalid %s integer", envNumFDs) } sp.listeners = make([]*overseerListener, numFDs) sp.state.Listeners = make([]net.Listener, numFDs) for i := 0; i < numFDs; i++ { f := os.NewFile(uintptr(3+i), "") l, err := net.FileListener(f) if err != nil { return fmt.Errorf("failed to inherit file descriptor: %d", i) } u := newOverseerListener(l) sp.listeners[i] = u sp.state.Listeners[i] = u } if len(sp.state.Listeners) > 0 { sp.state.Listener = sp.state.Listeners[0] } return nil }
子進程只是重新包裝套接字,并沒有新建監(jiān)聽連接,包裝成u := newOverseerListener(l)類型,這些監(jiān)聽套接字最后傳遞給sp.Config.Program(sp.state),即用戶的啟動程序:
overseer-v1.1.6/example/main.go
// convert your 'main()' into a 'prog(state)' // 'prog()' is run in a child process func prog(state overseer.State) { fmt.Printf("app#%s (%s) listening...\n", BuildID, state.ID) http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { d, _ := time.ParseDuration(r.URL.Query().Get("d")) time.Sleep(d) fmt.Fprintf(w, "app#%s (%s) %v says hello\n", BuildID, state.ID, state.StartedAt) })) http.Serve(state.Listener, nil) fmt.Printf("app#%s (%s) exiting...\n", BuildID, state.ID) } // then create another 'main' which runs the upgrades // 'main()' is run in the initial process func main() { overseer.Run(overseer.Config{ Program: prog, Address: ":5001", Fetcher: &fetcher.File{Path: "my_app_next"}, Debug: true, //display log of overseer actions TerminateTimeout: 10 * time.Minute, }) }
在用戶程序中http.Serve(state.Listener, nil)調(diào)用:
1、使用的accept方式是包裝后的(l *overseerListener) Accept()。
2、defer l.Close()使用也是包裝后的(l *overseerListener) Close()。
3、由(l *overseerListener) Accept()創(chuàng)建的work連接也都包裝成了overseerConn連接,在關(guān)閉時會調(diào)用(o overseerConn) Close()
三、文件變更的自動重啟
能夠自動監(jiān)視文件變化,有變更時自動觸發(fā)重啟流程。
在master進程啟動時檢查配置,如果設(shè)置了mp.Config.Fetcher則進入fetchLoop:
overseer-v1.1.6/proc_master.go
// fetchLoop is run in a goroutine func (mp *master) fetchLoop() { min := mp.Config.MinFetchInterval time.Sleep(min) for { t0 := time.Now() mp.fetch() //duration fetch of fetch diff := time.Now().Sub(t0) if diff < min { delay := min - diff //ensures at least MinFetchInterval delay. //should be throttled by the fetcher! time.Sleep(delay) } } }
mp.Config.MinFetchInterval默認是1秒,也就是每秒檢查一次變更。time.Duration類型,可以設(shè)置更小的粒度。
已經(jīng)支持的fetcher包括:fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。
以fetcher_file.go為例說明。
1、文件變更的判斷:
overseer-v1.1.6/proc_master.go
//tee off to sha1 hash := sha1.New() reader = io.TeeReader(reader, hash) //write to a temp file _, err = io.Copy(tmpBin, reader) if err != nil { mp.warnf("failed to write temp binary: %s", err) return } //compare hash newHash := hash.Sum(nil) if bytes.Equal(mp.binHash, newHash) { mp.debugf("hash match - skip") return }
通過sha1算法實現(xiàn),比較新舊hash值,并沒有關(guān)注文件時間戳。
2、驗證是可執(zhí)行文件,且是支持overseer的:
overseer-v1.1.6/proc_master.go
tokenIn := token() cmd := exec.Command(tmpBinPath) cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...) cmd.Args = os.Args returned := false go func() { time.Sleep(5 * time.Second) if !returned { mp.warnf("sanity check against fetched executable timed-out, check overseer is running") if cmd.Process != nil { cmd.Process.Kill() } } }() tokenOut, err := cmd.CombinedOutput() returned = true if err != nil { mp.warnf("failed to run temp binary: %s (%s) output \"%s\"", err, tmpBinPath, tokenOut) return } if tokenIn != string(tokenOut) { mp.warnf("sanity check failed") return }
這是通過overseer預(yù)埋的代碼實現(xiàn)的:
overseer-v1.1.6/overseer.go
//sanityCheck returns true if a check was performed func sanityCheck() bool { //sanity check if token := os.Getenv(envBinCheck); token != "" { fmt.Fprint(os.Stdout, token) return true } //legacy sanity check using old env var if token := os.Getenv(envBinCheckLegacy); token != "" { fmt.Fprint(os.Stdout, token) return true } return false }
這段代碼在main啟動時在overseer.Run里會調(diào)用到,傳遞固定的環(huán)境變量,然后命令行輸出會原樣顯示出來即為成功。
3、覆蓋舊文件,并觸發(fā)重啟。
overseer-v1.1.6/proc_master.go
//overwrite! if err := overwrite(mp.binPath, tmpBinPath); err != nil { mp.warnf("failed to overwrite binary: %s", err) return } mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12]) mp.binHash = newHash //binary successfully replaced if !mp.Config.NoRestartAfterFetch { mp.triggerRestart() }
由(mp *master) triggerRestart進入重啟流程:
overseer-v1.1.6/proc_master.go
func (mp *master) triggerRestart() { if mp.restarting { mp.debugf("already graceful restarting") return //skip } else if mp.slaveCmd == nil || mp.restarting { mp.debugf("no slave process") return //skip } mp.debugf("graceful restart triggered") mp.restarting = true mp.awaitingUSR1 = true mp.signalledAt = time.Now() mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate select { case <-mp.restarted: //success mp.debugf("restart success") case <-time.After(mp.TerminateTimeout): //times up mr. process, we did ask nicely! mp.debugf("graceful timeout, forcing exit") mp.sendSignal(os.Kill) } }
向子進程發(fā)送mp.Config.RestartSignal信號,子進程收到信號后,關(guān)閉監(jiān)聽套接字然后向父進程發(fā)送SIGUSR1信號:
overseer-v1.1.6/proc_slave.go
if len(sp.listeners) > 0 { //perform graceful shutdown for _, l := range sp.listeners { l.release(sp.Config.TerminateTimeout) } //signal release of held sockets, allows master to start //a new process before this child has actually exited. //early restarts not supported with restarts disabled. if !sp.NoRestart { sp.masterProc.Signal(SIGUSR1) } //listeners should be waiting on connections to close... }
父進程收到SIGUSR1信號后,通知mp.descriptorsReleased管道監(jiān)聽套接字已經(jīng)關(guān)閉:
overseer-v1.1.6/proc_master.go
//**during a restart** a SIGUSR1 signals //to the master process that, the file //descriptors have been released if mp.awaitingUSR1 && s == SIGUSR1 { mp.debugf("signaled, sockets ready") mp.awaitingUSR1 = false mp.descriptorsReleased <- true } else
最終回到(mp *master) fork函數(shù),fork函數(shù)一直在等待mp.descriptorsReleased通知或者cmd.Wait子進程退出,收到管道通知后fork退出,進入下一輪fork循環(huán)。
overseer-v1.1.6/proc_master.go
func (mp *master) fork() error { //... ... //... ... //... ... //convert wait into channel cmdwait := make(chan error) go func() { cmdwait <- cmd.Wait() }() //wait.... select { case err := <-cmdwait: //program exited before releasing descriptors //proxy exit code out to master code := 0 if err != nil { code = 1 if exiterr, ok := err.(*exec.ExitError); ok { if status, ok := exiterr.Sys().(syscall.WaitStatus); ok { code = status.ExitStatus() } } } mp.debugf("prog exited with %d", code) //if a restarts are disabled or if it was an //unexpected crash, proxy this exit straight //through to the main process if mp.NoRestart || !mp.restarting { os.Exit(code) } case <-mp.descriptorsReleased: //if descriptors are released, the program //has yielded control of its sockets and //a parallel instance of the program can be //started safely. it should serve state.Listeners //to ensure downtime is kept at <1sec. The previous //cmd.Wait() will still be consumed though the //result will be discarded. } return nil }
以上就是關(guān)于go平滑重啟庫overseer實現(xiàn)原理詳解的詳細內(nèi)容,更多關(guān)于go平滑重啟庫overseer的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang使用map支持高并發(fā)的方法(1000萬次操作14ms)
這篇文章主要介紹了golang使用map支持高并發(fā)的方法(1000萬次操作14ms),本文給大家詳細講解,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-11-11golang 定時任務(wù)方面time.Sleep和time.Tick的優(yōu)劣對比分析
這篇文章主要介紹了golang 定時任務(wù)方面time.Sleep和time.Tick的優(yōu)劣對比分析,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05Golang String字符串類型轉(zhuǎn)Json格式
本文主要介紹了Golang String字符串類型轉(zhuǎn)Json格式的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼
這篇文章主要介紹了gin使用自定義結(jié)構(gòu)綁定表單數(shù)據(jù)的示例代碼,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11golang 檢查網(wǎng)絡(luò)狀態(tài)是否正常的方法
今天小編就為大家分享一篇golang 檢查網(wǎng)絡(luò)狀態(tài)是否正常的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07