Golang分布式注冊(cè)中心實(shí)現(xiàn)流程講解
動(dòng)手實(shí)現(xiàn)一個(gè)分布式注冊(cè)中心
以一個(gè)日志微服務(wù)為例,將日志服務(wù)注冊(cè)到注冊(cè)中心展開!
日志服務(wù)
log/Server.go
其實(shí)這一個(gè)日志類的功能就是有基本的寫文件功能,然后就是注冊(cè)一個(gè)http的接口去寫日志進(jìn)去
package log import ( "io/ioutil" stlog "log" "net/http" "os" ) var log *stlog.Logger type fileLog string // 編寫日志的方法 func (fl fileLog) Write(data []byte) (int, error) { f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600) if err != nil { return 0, err } defer f.Close() return f.Write(data) } // 啟動(dòng)一個(gè)日志對(duì)象 參數(shù)為日志文件名 func Run(destination string) { log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags) } // 自身注冊(cè)的一個(gè)服務(wù)方法 func RegisterHandlers() { http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: msg, err := ioutil.ReadAll(r.Body) if err != nil || len(msg) == 0 { w.WriteHeader(http.StatusBadRequest) return } write(string(msg)) default: w.WriteHeader(http.StatusMethodNotAllowed) return } }) } func write(message string) { log.Printf("%v\n", message) }
log/Client.go
提供給外部服務(wù)的接口,定義好日志的命名格式,來(lái)顯示調(diào)用接口去使用已經(jīng)注冊(cè)好的日志接口并且返回狀態(tài)
package log import ( "bytes" "distributed/registry" "fmt" "net/http" stlog "log" ) func SetClientLogger(serviceURL string, clientService registry.ServiceName) { stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService)) stlog.SetFlags(0) stlog.SetOutput(&clientLogger{url: serviceURL}) } type clientLogger struct { url string } func (cl clientLogger) Write(data []byte) (int, error) { b := bytes.NewBuffer([]byte(data)) res, err := http.Post(cl.url+"/log", "text/plain", b) if err != nil { return 0, err } if res.StatusCode != http.StatusOK { return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status) } return len(data), nil }
主啟動(dòng)程序LogService
啟動(dòng)服務(wù)Logservice
,主要執(zhí)行start方法,里面有細(xì)節(jié)實(shí)現(xiàn)服務(wù)注冊(cè)與服務(wù)發(fā)現(xiàn)
package main import ( "context" "distributed/log" "distributed/registry" "distributed/service" "fmt" stlog "log" ) func main() { // 初始化啟動(dòng)一個(gè)日志文件對(duì)象 log.Run("./distributed.log") // 日志服務(wù)注冊(cè)的端口和地址 host, port := "localhost", "4000" serviceAddress := fmt.Sprintf("http://%s:%s", host, port) // 初始化注冊(cè)對(duì)象 r := registry.Registration{ ServiceName: registry.LogService, // 自身服務(wù)名 ServiceURL: serviceAddress, // 自身服務(wù)地址 RequiredServices: make([]registry.ServiceName, 0),// 依賴服務(wù) ServiceUpdateURL: serviceAddress + "/services", // 服務(wù)列表 HeartbeatURL: serviceAddress + "/heartbeat", // 心跳 } // 啟動(dòng)日志服務(wù)包含服務(wù)注冊(cè),發(fā)現(xiàn)等細(xì)節(jié) ctx, err := service.Start( context.Background(), host, port, r, log.RegisterHandlers, ) // 異常寫入到日志中 if err != nil { stlog.Fatalln(err) } // 超時(shí)停止退出服務(wù) <-ctx.Done() fmt.Println("Shutting down log service.") }
服務(wù)啟動(dòng)與注冊(cè)
service/service.go
Start 啟動(dòng)服務(wù)的主方法
/* host: 地址 port: 端口號(hào) reg: 注冊(cè)的服務(wù)對(duì)象 registerHandlersFunc: 注冊(cè)方法 */ func Start(ctx context.Context, host, port string, reg registry.Registration, registerHandlersFunc func()) (context.Context, error) { registerHandlersFunc() // 啟動(dòng)注冊(cè)方法 // 啟動(dòng)服務(wù) ctx = startService(ctx, reg.ServiceName, host, port) // 注冊(cè)服務(wù) err := registry.RegisterService(reg) if err != nil { return ctx, err } return ctx, nil }
startService
func startService(ctx context.Context, serviceName registry.ServiceName, host, port string) context.Context { ctx, cancel := context.WithCancel(ctx) var srv http.Server srv.Addr = ":" + port // 該協(xié)程為監(jiān)聽http服務(wù),并且停止服務(wù)的時(shí)候cancel go func() { log.Println(srv.ListenAndServe()) // 刪除對(duì)應(yīng)的服務(wù) err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) if err != nil { log.Println(err) } cancel() }() // 該協(xié)程為監(jiān)聽手動(dòng)停止服務(wù)的信號(hào) go func() { fmt.Printf("%v started. Press any key to stop. \n", serviceName) var s string fmt.Scanln(&s) err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port)) if err != nil { log.Println(err) } srv.Shutdown(ctx) cancel() }() return ctx }
服務(wù)注冊(cè)與發(fā)現(xiàn)
registry/client.go
注冊(cè)服務(wù)的時(shí)候會(huì)連著心跳以及服務(wù)更新的方法一起注冊(cè)!
而服務(wù)更新里面的細(xì)節(jié)就是自己自定義了一個(gè)Handler然后ServeHttp方法里面去update
全局的服務(wù)提供對(duì)象,
update主要是更新服務(wù)和刪除服務(wù)的最新消息
然后就是提供一個(gè)注銷服務(wù)的方法
package registry import ( "bytes" "encoding/json" "fmt" "log" "math/rand" "net/http" "net/url" "sync" ) // 注冊(cè)服務(wù) func RegisterService(r Registration) error { // 獲得心跳地址并注冊(cè) heartbeatURL, err := url.Parse(r.HeartbeatURL) if err != nil { return err } http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) // 獲得服務(wù)更新地址,并且自定義http服務(wù)的handler,因?yàn)槊看胃路?wù)的時(shí)候,可以在ServeHttp方法里面去維護(hù) serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL) if err != nil { return err } http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{}) // 寫入buf值將服務(wù)對(duì)象發(fā)送給注冊(cè)中心的services地址 buf := new(bytes.Buffer) enc := json.NewEncoder(buf) err = enc.Encode(r) if err != nil { return err } res, err := http.Post(ServicesURL, "application/json", buf) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("Failed to register service. Registry service "+ "responded with code %v", res.StatusCode) } return nil } type serviceUpdateHanlder struct{} func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } dec := json.NewDecoder(r.Body) var p patch err := dec.Decode(&p) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } fmt.Printf("Updated received %v\n", p) prov.Update(p) // 更新服務(wù)提供對(duì)象 } // 刪除對(duì)應(yīng)注冊(cè)中心的服務(wù)地址 func ShutdownService(url string) error { req, err := http.NewRequest(http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte(url))) if err != nil { return err } req.Header.Add("Content-Type", "text/plain") res, err := http.DefaultClient.Do(req) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("Failed to deregister service. Registry "+ "service responded with code %v", res.StatusCode) } return nil } // 更新服務(wù)列表 func (p *providers) Update(pat patch) { p.mutex.Lock() defer p.mutex.Unlock() // 將patch中有新增的進(jìn)行添加 for _, patchEntry := range pat.Added { if _, ok := p.services[patchEntry.Name]; !ok { p.services[patchEntry.Name] = make([]string, 0) } p.services[patchEntry.Name] = append(p.services[patchEntry.Name], patchEntry.URL) } // 將patch中被標(biāo)記刪除的 for _, patchEntry := range pat.Removed { if providerURLs, ok := p.services[patchEntry.Name]; ok { for i := range providerURLs { if providerURLs[i] == patchEntry.URL { p.services[patchEntry.Name] = append(providerURLs[:i], providerURLs[i+1:]...) } } } } } // 根據(jù)服務(wù)名負(fù)載均衡隨機(jī)獲取服務(wù)地址 func (p providers) get(name ServiceName) (string, error) { providers, ok := p.services[name] if !ok { return "", fmt.Errorf("No providers available for service %v", name) } idx := int(rand.Float32() * float32(len(providers))) return providers[idx], nil } // 對(duì)外暴露生產(chǎn)者的方法 func GetProvider(name ServiceName) (string, error) { return prov.get(name) } type providers struct { services map[ServiceName][]string mutex *sync.RWMutex } // 服務(wù)提供對(duì)象 var prov = providers{ services: make(map[ServiceName][]string), // 服務(wù)列表 服務(wù)名->集群地址集合 mutex: new(sync.RWMutex), // 鎖 防止服務(wù)注冊(cè)更新時(shí)的并發(fā)情況 }
registry/registration.go
主要是一些關(guān)于服務(wù)使用到的參數(shù)以及對(duì)象!
package registry type Registration struct { ServiceName ServiceName // 服務(wù)名 ServiceURL string // 服務(wù)地址 RequiredServices []ServiceName // 依賴的服務(wù) ServiceUpdateURL string // 服務(wù)更新的地址 HeartbeatURL string // 心跳地址 } type ServiceName string // 服務(wù)名集合 const ( LogService = ServiceName("LogService") GradingService = ServiceName("GradingService") PortalService = ServiceName("Portald") ) // 服務(wù)對(duì)象參數(shù) type patchEntry struct { Name ServiceName URL string } // 更新的服務(wù)對(duì)象參數(shù) type patch struct { Added []patchEntry Removed []patchEntry }
registry/server.go
服務(wù)端的注冊(cè)中心服務(wù)的增刪改查管理以及心跳檢測(cè),及時(shí)將最新的更新的服務(wù)消息通知回給客戶端
package registry import ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "sync" "time" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" // 注冊(cè)中心地址 // 服務(wù)對(duì)象集合 type registry struct { registrations []Registration mutex *sync.RWMutex } // 添加服務(wù) func (r *registry) add(reg Registration) error { r.mutex.Lock() r.registrations = append(r.registrations, reg) r.mutex.Unlock() err := r.sendRequiredServices(reg) r.notify(patch{ Added: []patchEntry{ patchEntry{ Name: reg.ServiceName, URL: reg.ServiceURL, }, }, }) return err } // 通知服務(wù)接口請(qǐng)求去刷新改變后到服務(wù) func (r registry) notify(fullPatch patch) { r.mutex.RLock() defer r.mutex.RUnlock() for _, reg := range r.registrations { go func(reg Registration) { for _, reqService := range reg.RequiredServices { p := patch{Added: []patchEntry{}, Removed: []patchEntry{}} sendUpdate := false for _, added := range fullPatch.Added { if added.Name == reqService { p.Added = append(p.Added, added) sendUpdate = true } } for _, removed := range fullPatch.Removed { if removed.Name == reqService { p.Removed = append(p.Removed, removed) sendUpdate = true } } if sendUpdate { err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { log.Println(err) return } } } }(reg) } } // 更新每個(gè)服務(wù)的依賴服務(wù) func (r registry) sendRequiredServices(reg Registration) error { r.mutex.RLock() defer r.mutex.RUnlock() var p patch for _, serviceReg := range r.registrations { for _, reqService := range reg.RequiredServices { if serviceReg.ServiceName == reqService { p.Added = append(p.Added, patchEntry{ Name: serviceReg.ServiceName, URL: serviceReg.ServiceURL, }) } } } err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { return err } return nil } // 告訴客戶端更新,最新的服務(wù)列表是這個(gè) func (r registry) sendPatch(p patch, url string) error { d, err := json.Marshal(p) if err != nil { return err } _, err = http.Post(url, "application/json", bytes.NewBuffer(d)) if err != nil { return err } return nil } // 注冊(cè)中心刪除服務(wù)對(duì)象 func (r *registry) remove(url string) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { // 通知客戶端更新對(duì)象信息 r.notify(patch{ Removed: []patchEntry{ { Name: r.registrations[i].ServiceName, URL: r.registrations[i].ServiceURL, }, }, }) r.mutex.Lock() reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...) r.mutex.Unlock() return nil } } return fmt.Errorf("Service at URL %s not found", url) } // 心跳檢測(cè) func (r *registry) heartbeat(freq time.Duration) { for { var wg sync.WaitGroup for _, reg := range r.registrations { wg.Add(1) go func(reg Registration) { defer wg.Done() success := true for attemps := 0; attemps < 3; attemps++ { res, err := http.Get(reg.HeartbeatURL) if err != nil { log.Println(err) } else if res.StatusCode == http.StatusOK { log.Printf("Heartbeat check passed for %v", reg.ServiceName) // 如果心跳恢復(fù)了,把服務(wù)重新注冊(cè)回來(lái) if !success { r.add(reg) } break; } // 如果執(zhí)行到這就代表著心跳沒(méi)有響應(yīng),那就代表著需要回收注銷該服務(wù)了 log.Printf("Heartbeat check failed for %v", reg.ServiceName) if success { success = false r.remove(reg.ServiceURL) } time.Sleep(1 * time.Second) } }(reg) wg.Wait() time.Sleep(freq) } } } var once sync.Once func SetupRegistryService() { // 保證執(zhí)行一次進(jìn)行服務(wù)到心跳 每三秒循環(huán)一遍 once.Do(func() { go reg.heartbeat(3 * time.Second) }) } var reg = registry{ registrations: make([]Registration, 0), mutex: new(sync.RWMutex), } type RegistryService struct{} func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Println("Request received") switch r.Method { case http.MethodPost: dec := json.NewDecoder(r.Body) var r Registration err := dec.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("Adding service: %v with URL: %s\n", r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string(payload) log.Printf("Removing service at URL: %s", url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } default: w.WriteHeader(http.StatusMethodNotAllowed) return } }
到此這篇關(guān)于Golang分布式注冊(cè)中心實(shí)現(xiàn)流程講解的文章就介紹到這了,更多相關(guān)Golang分布式注冊(cè)中心內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語(yǔ)言實(shí)現(xiàn)可選參數(shù)的方法小結(jié)
這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言實(shí)現(xiàn)可選參數(shù)的一些常見(jiàn)方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-02-02Golang Gin框架實(shí)現(xiàn)多種數(shù)據(jù)格式返回結(jié)果詳解
這篇文章主要介紹了Golang Gin框架實(shí)現(xiàn)多種數(shù)據(jù)格式返回結(jié)果,我們都知道,一個(gè)完整的請(qǐng)求包含請(qǐng)求和處理請(qǐng)求以及結(jié)果返回三個(gè)步驟,在服務(wù)器端對(duì)請(qǐng)求處理完成以后,會(huì)將結(jié)果返回給客戶端,在gin框架中,支持返回多種請(qǐng)求數(shù)據(jù)格式,下面我們一起來(lái)看看2023-05-05go語(yǔ)言數(shù)組及結(jié)構(gòu)體繼承和初始化示例解析
這篇文章主要為大家介紹了go語(yǔ)言數(shù)組及結(jié)構(gòu)體繼承和初始化示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04關(guān)于Golang變量初始化/類型推斷/短聲明的問(wèn)題
這篇文章主要介紹了關(guān)于Golang變量初始化/類型推斷/短聲明的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項(xiàng)目(打包)全過(guò)程
最近在配置idea開發(fā)go語(yǔ)言時(shí)碰到很多問(wèn)題,所以這里給大家總結(jié)下,這篇文章主要給大家介紹了關(guān)于intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項(xiàng)目(打包)的相關(guān)資料,需要的朋友可以參考下2023-10-10Golang基礎(chǔ)教程之字符串string實(shí)例詳解
這篇文章主要給大家介紹了關(guān)于Golang基礎(chǔ)教程之字符串string的相關(guān)資料,需要的朋友可以參考下2022-07-07