Golang分布式注冊(cè)中心實(shí)現(xiàn)流程講解
動(dòng)手實(shí)現(xiàn)一個(gè)分布式注冊(cè)中心
以一個(gè)日志微服務(wù)為例,將日志服務(wù)注冊(cè)到注冊(cè)中心展開!

日志服務(wù)
log/Server.go
其實(shí)這一個(gè)日志類的功能就是有基本的寫文件功能,然后就是注冊(cè)一個(gè)http的接口去寫日志進(jìn)去
package log
import (
"io/ioutil"
stlog "log"
"net/http"
"os"
)
var log *stlog.Logger
type fileLog string
// 編寫日志的方法
func (fl fileLog) Write(data []byte) (int, error) {
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return 0, err
}
defer f.Close()
return f.Write(data)
}
// 啟動(dòng)一個(gè)日志對(duì)象 參數(shù)為日志文件名
func Run(destination string) {
log = stlog.New(fileLog(destination), "[go] - ", stlog.LstdFlags)
}
// 自身注冊(cè)的一個(gè)服務(wù)方法
func RegisterHandlers() {
http.HandleFunc("/log", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodPost:
msg, err := ioutil.ReadAll(r.Body)
if err != nil || len(msg) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
write(string(msg))
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
})
}
func write(message string) {
log.Printf("%v\n", message)
}log/Client.go
提供給外部服務(wù)的接口,定義好日志的命名格式,來(lái)顯示調(diào)用接口去使用已經(jīng)注冊(cè)好的日志接口并且返回狀態(tài)
package log
import (
"bytes"
"distributed/registry"
"fmt"
"net/http"
stlog "log"
)
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
stlog.SetFlags(0)
stlog.SetOutput(&clientLogger{url: serviceURL})
}
type clientLogger struct {
url string
}
func (cl clientLogger) Write(data []byte) (int, error) {
b := bytes.NewBuffer([]byte(data))
res, err := http.Post(cl.url+"/log", "text/plain", b)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
}
return len(data), nil
}主啟動(dòng)程序LogService
啟動(dòng)服務(wù)Logservice,主要執(zhí)行start方法,里面有細(xì)節(jié)實(shí)現(xiàn)服務(wù)注冊(cè)與服務(wù)發(fā)現(xiàn)
package main
import (
"context"
"distributed/log"
"distributed/registry"
"distributed/service"
"fmt"
stlog "log"
)
func main() {
// 初始化啟動(dòng)一個(gè)日志文件對(duì)象
log.Run("./distributed.log")
// 日志服務(wù)注冊(cè)的端口和地址
host, port := "localhost", "4000"
serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
// 初始化注冊(cè)對(duì)象
r := registry.Registration{
ServiceName: registry.LogService, // 自身服務(wù)名
ServiceURL: serviceAddress, // 自身服務(wù)地址
RequiredServices: make([]registry.ServiceName, 0),// 依賴服務(wù)
ServiceUpdateURL: serviceAddress + "/services", // 服務(wù)列表
HeartbeatURL: serviceAddress + "/heartbeat", // 心跳
}
// 啟動(dòng)日志服務(wù)包含服務(wù)注冊(cè),發(fā)現(xiàn)等細(xì)節(jié)
ctx, err := service.Start(
context.Background(),
host,
port,
r,
log.RegisterHandlers,
)
// 異常寫入到日志中
if err != nil {
stlog.Fatalln(err)
}
// 超時(shí)停止退出服務(wù)
<-ctx.Done()
fmt.Println("Shutting down log service.")
}服務(wù)啟動(dòng)與注冊(cè)
service/service.go
Start 啟動(dòng)服務(wù)的主方法
/*
host: 地址
port: 端口號(hào)
reg: 注冊(cè)的服務(wù)對(duì)象
registerHandlersFunc: 注冊(cè)方法
*/
func Start(ctx context.Context, host, port string,
reg registry.Registration,
registerHandlersFunc func()) (context.Context, error) {
registerHandlersFunc() // 啟動(dòng)注冊(cè)方法
// 啟動(dòng)服務(wù)
ctx = startService(ctx, reg.ServiceName, host, port)
// 注冊(cè)服務(wù)
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}
return ctx, nil
}startService
func startService(ctx context.Context, serviceName registry.ServiceName,
host, port string) context.Context {
ctx, cancel := context.WithCancel(ctx)
var srv http.Server
srv.Addr = ":" + port
// 該協(xié)程為監(jiān)聽http服務(wù),并且停止服務(wù)的時(shí)候cancel
go func() {
log.Println(srv.ListenAndServe())
// 刪除對(duì)應(yīng)的服務(wù)
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
cancel()
}()
// 該協(xié)程為監(jiān)聽手動(dòng)停止服務(wù)的信號(hào)
go func() {
fmt.Printf("%v started. Press any key to stop. \n", serviceName)
var s string
fmt.Scanln(&s)
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
srv.Shutdown(ctx)
cancel()
}()
return ctx
}服務(wù)注冊(cè)與發(fā)現(xiàn)
registry/client.go
注冊(cè)服務(wù)的時(shí)候會(huì)連著心跳以及服務(wù)更新的方法一起注冊(cè)!
而服務(wù)更新里面的細(xì)節(jié)就是自己自定義了一個(gè)Handler然后ServeHttp方法里面去update全局的服務(wù)提供對(duì)象,
update主要是更新服務(wù)和刪除服務(wù)的最新消息
然后就是提供一個(gè)注銷服務(wù)的方法
package registry
import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
)
// 注冊(cè)服務(wù)
func RegisterService(r Registration) error {
// 獲得心跳地址并注冊(cè)
heartbeatURL, err := url.Parse(r.HeartbeatURL)
if err != nil {
return err
}
http.HandleFunc(heartbeatURL.Path, func (w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
// 獲得服務(wù)更新地址,并且自定義http服務(wù)的handler,因?yàn)槊看胃路?wù)的時(shí)候,可以在ServeHttp方法里面去維護(hù)
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
if err != nil {
return err
}
http.Handle(serviceUpdateURL.Path, &serviceUpdateHanlder{})
// 寫入buf值將服務(wù)對(duì)象發(fā)送給注冊(cè)中心的services地址
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err = enc.Encode(r)
if err != nil {
return err
}
res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to register service. Registry service "+
"responded with code %v", res.StatusCode)
}
return nil
}
type serviceUpdateHanlder struct{}
func (suh serviceUpdateHanlder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
dec := json.NewDecoder(r.Body)
var p patch
err := dec.Decode(&p)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
fmt.Printf("Updated received %v\n", p)
prov.Update(p) // 更新服務(wù)提供對(duì)象
}
// 刪除對(duì)應(yīng)注冊(cè)中心的服務(wù)地址
func ShutdownService(url string) error {
req, err := http.NewRequest(http.MethodDelete, ServicesURL,
bytes.NewBuffer([]byte(url)))
if err != nil {
return err
}
req.Header.Add("Content-Type", "text/plain")
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to deregister service. Registry "+
"service responded with code %v", res.StatusCode)
}
return nil
}
// 更新服務(wù)列表
func (p *providers) Update(pat patch) {
p.mutex.Lock()
defer p.mutex.Unlock()
// 將patch中有新增的進(jìn)行添加
for _, patchEntry := range pat.Added {
if _, ok := p.services[patchEntry.Name]; !ok {
p.services[patchEntry.Name] = make([]string, 0)
}
p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
patchEntry.URL)
}
// 將patch中被標(biāo)記刪除的
for _, patchEntry := range pat.Removed {
if providerURLs, ok := p.services[patchEntry.Name]; ok {
for i := range providerURLs {
if providerURLs[i] == patchEntry.URL {
p.services[patchEntry.Name] = append(providerURLs[:i],
providerURLs[i+1:]...)
}
}
}
}
}
// 根據(jù)服務(wù)名負(fù)載均衡隨機(jī)獲取服務(wù)地址
func (p providers) get(name ServiceName) (string, error) {
providers, ok := p.services[name]
if !ok {
return "", fmt.Errorf("No providers available for service %v", name)
}
idx := int(rand.Float32() * float32(len(providers)))
return providers[idx], nil
}
// 對(duì)外暴露生產(chǎn)者的方法
func GetProvider(name ServiceName) (string, error) {
return prov.get(name)
}
type providers struct {
services map[ServiceName][]string
mutex *sync.RWMutex
}
// 服務(wù)提供對(duì)象
var prov = providers{
services: make(map[ServiceName][]string), // 服務(wù)列表 服務(wù)名->集群地址集合
mutex: new(sync.RWMutex), // 鎖 防止服務(wù)注冊(cè)更新時(shí)的并發(fā)情況
}registry/registration.go
主要是一些關(guān)于服務(wù)使用到的參數(shù)以及對(duì)象!
package registry
type Registration struct {
ServiceName ServiceName // 服務(wù)名
ServiceURL string // 服務(wù)地址
RequiredServices []ServiceName // 依賴的服務(wù)
ServiceUpdateURL string // 服務(wù)更新的地址
HeartbeatURL string // 心跳地址
}
type ServiceName string
// 服務(wù)名集合
const (
LogService = ServiceName("LogService")
GradingService = ServiceName("GradingService")
PortalService = ServiceName("Portald")
)
// 服務(wù)對(duì)象參數(shù)
type patchEntry struct {
Name ServiceName
URL string
}
// 更新的服務(wù)對(duì)象參數(shù)
type patch struct {
Added []patchEntry
Removed []patchEntry
}registry/server.go
服務(wù)端的注冊(cè)中心服務(wù)的增刪改查管理以及心跳檢測(cè),及時(shí)將最新的更新的服務(wù)消息通知回給客戶端
package registry
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"
)
const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services" // 注冊(cè)中心地址
// 服務(wù)對(duì)象集合
type registry struct {
registrations []Registration
mutex *sync.RWMutex
}
// 添加服務(wù)
func (r *registry) add(reg Registration) error {
r.mutex.Lock()
r.registrations = append(r.registrations, reg)
r.mutex.Unlock()
err := r.sendRequiredServices(reg)
r.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})
return err
}
// 通知服務(wù)接口請(qǐng)求去刷新改變后到服務(wù)
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()
for _, reg := range r.registrations {
go func(reg Registration) {
for _, reqService := range reg.RequiredServices {
p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
sendUpdate := false
for _, added := range fullPatch.Added {
if added.Name == reqService {
p.Added = append(p.Added, added)
sendUpdate = true
}
}
for _, removed := range fullPatch.Removed {
if removed.Name == reqService {
p.Removed = append(p.Removed, removed)
sendUpdate = true
}
}
if sendUpdate {
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
log.Println(err)
return
}
}
}
}(reg)
}
}
// 更新每個(gè)服務(wù)的依賴服務(wù)
func (r registry) sendRequiredServices(reg Registration) error {
r.mutex.RLock()
defer r.mutex.RUnlock()
var p patch
for _, serviceReg := range r.registrations {
for _, reqService := range reg.RequiredServices {
if serviceReg.ServiceName == reqService {
p.Added = append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
return err
}
return nil
}
// 告訴客戶端更新,最新的服務(wù)列表是這個(gè)
func (r registry) sendPatch(p patch, url string) error {
d, err := json.Marshal(p)
if err != nil {
return err
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
if err != nil {
return err
}
return nil
}
// 注冊(cè)中心刪除服務(wù)對(duì)象
func (r *registry) remove(url string) error {
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
// 通知客戶端更新對(duì)象信息
r.notify(patch{
Removed: []patchEntry{
{
Name: r.registrations[i].ServiceName,
URL: r.registrations[i].ServiceURL,
},
},
})
r.mutex.Lock()
reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
r.mutex.Unlock()
return nil
}
}
return fmt.Errorf("Service at URL %s not found", url)
}
// 心跳檢測(cè)
func (r *registry) heartbeat(freq time.Duration) {
for {
var wg sync.WaitGroup
for _, reg := range r.registrations {
wg.Add(1)
go func(reg Registration) {
defer wg.Done()
success := true
for attemps := 0; attemps < 3; attemps++ {
res, err := http.Get(reg.HeartbeatURL)
if err != nil {
log.Println(err)
} else if res.StatusCode == http.StatusOK {
log.Printf("Heartbeat check passed for %v", reg.ServiceName)
// 如果心跳恢復(fù)了,把服務(wù)重新注冊(cè)回來(lái)
if !success {
r.add(reg)
}
break;
}
// 如果執(zhí)行到這就代表著心跳沒(méi)有響應(yīng),那就代表著需要回收注銷該服務(wù)了
log.Printf("Heartbeat check failed for %v", reg.ServiceName)
if success {
success = false
r.remove(reg.ServiceURL)
}
time.Sleep(1 * time.Second)
}
}(reg)
wg.Wait()
time.Sleep(freq)
}
}
}
var once sync.Once
func SetupRegistryService() {
// 保證執(zhí)行一次進(jìn)行服務(wù)到心跳 每三秒循環(huán)一遍
once.Do(func() {
go reg.heartbeat(3 * time.Second)
})
}
var reg = registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}
type RegistryService struct{}
func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("Request received")
switch r.Method {
case http.MethodPost:
dec := json.NewDecoder(r.Body)
var r Registration
err := dec.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
log.Printf("Adding service: %v with URL: %s\n", r.ServiceName,
r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("Removing service at URL: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}到此這篇關(guān)于Golang分布式注冊(cè)中心實(shí)現(xiàn)流程講解的文章就介紹到這了,更多相關(guān)Golang分布式注冊(cè)中心內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語(yǔ)言實(shí)現(xiàn)可選參數(shù)的方法小結(jié)
這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言實(shí)現(xiàn)可選參數(shù)的一些常見(jiàn)方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-02-02
Golang Gin框架實(shí)現(xiàn)多種數(shù)據(jù)格式返回結(jié)果詳解
這篇文章主要介紹了Golang Gin框架實(shí)現(xiàn)多種數(shù)據(jù)格式返回結(jié)果,我們都知道,一個(gè)完整的請(qǐng)求包含請(qǐng)求和處理請(qǐng)求以及結(jié)果返回三個(gè)步驟,在服務(wù)器端對(duì)請(qǐng)求處理完成以后,會(huì)將結(jié)果返回給客戶端,在gin框架中,支持返回多種請(qǐng)求數(shù)據(jù)格式,下面我們一起來(lái)看看2023-05-05
go語(yǔ)言數(shù)組及結(jié)構(gòu)體繼承和初始化示例解析
這篇文章主要為大家介紹了go語(yǔ)言數(shù)組及結(jié)構(gòu)體繼承和初始化示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04
關(guān)于Golang變量初始化/類型推斷/短聲明的問(wèn)題
這篇文章主要介紹了關(guān)于Golang變量初始化/類型推斷/短聲明的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項(xiàng)目(打包)全過(guò)程
最近在配置idea開發(fā)go語(yǔ)言時(shí)碰到很多問(wèn)題,所以這里給大家總結(jié)下,這篇文章主要給大家介紹了關(guān)于intelliJ?idea安裝go開發(fā)環(huán)境并搭建go項(xiàng)目(打包)的相關(guān)資料,需要的朋友可以參考下2023-10-10
Golang基礎(chǔ)教程之字符串string實(shí)例詳解
這篇文章主要給大家介紹了關(guān)于Golang基礎(chǔ)教程之字符串string的相關(guān)資料,需要的朋友可以參考下2022-07-07

