詳解Go語言中的監(jiān)視器模式與配置熱更新
上篇介紹 GO 的 GUI 庫 Fyne 時(shí),提到 Fyne 的數(shù)據(jù)綁定用到了監(jiān)聽器模式。本文就展開說下我對(duì) Go 中監(jiān)聽器模式的理解和應(yīng)用吧。
監(jiān)聽器模式簡(jiǎn)介
監(jiān)聽器模式,或稱觀察者模式,它主要涉及兩個(gè)組件:主題(Subject)和監(jiān)聽器(Listener)。
Subject 負(fù)責(zé)維護(hù)一系列的監(jiān)聽器,在所觀測(cè)主題狀態(tài)變化,將這個(gè)事件通知給所有注冊(cè)的監(jiān)聽器。我統(tǒng)一將其定義為注冊(cè)中心 Registry。而監(jiān)聽器 Listener 則是實(shí)現(xiàn)了特定接口的對(duì)象,用于響應(yīng)事件消息,執(zhí)行處理邏輯。
對(duì)具體應(yīng)用而言,通常還會(huì)分出一個(gè) Watcher
或者 Monitor
用于檢測(cè)變化并推送給 Registry
。從而實(shí)現(xiàn)將檢測(cè)目標(biāo)從系統(tǒng)解耦,無視監(jiān)控組件類別。
這個(gè)模式在組件之間建立一種松散耦合的關(guān)系。將特定事件通知到關(guān)心它的其他組件,無需它們直接相互引用。看起來這個(gè)不也是發(fā)布-訂閱模式嗎?差不多一個(gè)意思。
之前工作中,用它最多的是配置的熱更新場(chǎng)景,這篇文章也會(huì)簡(jiǎn)單介紹基于它的 ETCD 配置熱更新。
Go 實(shí)現(xiàn)監(jiān)聽器模式
如何用 Go 實(shí)現(xiàn)監(jiān)聽模式?我將定義兩個(gè)新類型分別是注冊(cè)中心(Registry)和監(jiān)聽器接口(Listener)。
首先是 Listener
,它是一個(gè)接口,用于實(shí)現(xiàn)事件的響應(yīng)邏輯。
type Listener interface { OnTrigger() }
先將其定義為一個(gè)接口,它的實(shí)現(xiàn)類型要求支持 OnTrigger
方法,會(huì)在事件發(fā)生時(shí)被執(zhí)行。
type Registry struct { listeners []Listener } func (r *Registry) Register(l Listener) { r.listeners = append(r.listeners, l) } func (r *Registry) NotifyAll() { for _, listener := range r.listeners { listener.OnTrigger(key, value) } }
Registry
是所有監(jiān)聽器的注冊(cè)地,當(dāng)特定事件發(fā)生,我們通過 Registry.NotifyAll
將事件傳遞給所有 Listener
。
我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的案例,當(dāng)監(jiān)聽到某個(gè)事件發(fā)生,打印 "A specified event accured"。
為了模擬效果,本案例沒有 watcher,直接通過主函數(shù)調(diào)用 NotifyAll
模擬觸發(fā)事件。
為了打印事件消息,我們實(shí)現(xiàn) Listener
接口,創(chuàng)建新類型 EventPrinter,如下所示:
type EventPrinter struct { } func (printer *EventPrinter) OnTrigger() { fmt.Println("A specified event accured!") }
寫個(gè)主函數(shù)觸發(fā)下事件,測(cè)試看看是否符合預(yù)期,代碼如下所示:
func main() { r := &Registry{} r.Reigster(&EventPrinter{}) // 模擬接收到消息,觸發(fā)事件通知 r.NotifyAll() }
執(zhí)行測(cè)試,內(nèi)容如下所示:
$ go run main.go A specified event occurred
如果希望自定義處理函數(shù),只需讓 Listener
支持自定義事件回調(diào)函數(shù)即可。
修改代碼如下所示:
type EventHandler struct { callback func() } func NewEventHandler(callback func()) *EventHandler { return &EventHandler{callback: callback} } func (e *EventHandler) OnTrigger() { e.callback() }
我們注冊(cè)一個(gè) EventHandler
到 Registry
,主函數(shù)代碼:
func main() { r := &Registry{} r.Reigster(&EventPrinter{}) r.Reigster(NewEventHandler(func() { fmt.Println("Custom Print: a specified event occurred!") })) r.NotifyAll() }
測(cè)試執(zhí)行:
$ go run main.go A specified event occurred! Custom Print: a specified event occurred!
基于 Go Channel 實(shí)現(xiàn)并發(fā)處理
前面的示例中 NotifyAll
是通過 for 循環(huán)依次調(diào)用 listener.OnTrigger
將消息發(fā)送給 Listener
,處理效率低下。
如何加速呢?
最直接的方法是通過 goroutine
運(yùn)行 listener.OnTrigger
方法。
func (r *Registry) NotifyAll() { for _, listener := range r.listeners { go listener.OnTrigger() } }
還有一種方法,通過 Channel 傳遞事件消息,這樣每個(gè) Listener
有獨(dú)立的 goroutine 監(jiān)聽和處理。
如下是 Listener
的實(shí)現(xiàn)代碼:
type Listener struct { EventChannel chan struct{} Callback func() } func NewListener(callback func()) *Listener { return &Listener{ EventChannel: make(chan struct{}, 1), // 帶緩沖的 channel,防止阻塞 Callback: callback, } } func (l *Listener) Start() { go func() { for range l.EventChannel { l.Callback() } }() }
這里 Listener
的事件處理函數(shù)在單獨(dú)的 goroutine 中運(yùn)行。而相應(yīng)的 Registry 實(shí)現(xiàn)也需要修改,代碼變更如下所示:
type Registry struct { listeners []*Listener } func (r *Registry) Register(listener *Listener) { r.listeners = append(r.listeners, listener) listener.Start() // 啟動(dòng)監(jiān)聽器的 goroutine } func (r *Registry) NotifyAll(message string) { for _, listener := range r.listeners { listener.EventChannel <- struct{}{} // 發(fā)送事件到監(jiān)聽器 } } func (r *Registry) Close() { for _, listener := range r.listeners { close(listener.EventChannel) // 關(guān)閉 channel,停止監(jiān)聽器 goroutine } }
整體上的變化不大,在 listner.Register
方法中啟動(dòng) Listener
事件處理 goroutine 等待事件消息。
實(shí)際案例:ETCD 配置熱更新
讓我們實(shí)踐一個(gè)具體的應(yīng)用場(chǎng)景:實(shí)現(xiàn)配置的動(dòng)態(tài)更新以及組件的自動(dòng)重連機(jī)制。
我們將針對(duì)包括 MySQL、Redis 在內(nèi)的各種組件,實(shí)現(xiàn)它們?cè)谂渲米兏鼤r(shí)能夠自動(dòng)重連。這些組件的配置信息將以 JSON 格式存儲(chǔ)于 ETCD 的多個(gè)鍵(Key)中。
假設(shè),配置結(jié)構(gòu)如下所示:
type MySQLConfig struct { Host string Port int User string Password string } type RedisConfig struct { Host string Port int }
這些配置被保存在 ETCD 中,我們要實(shí)時(shí)監(jiān)控配置的變化并據(jù)此更新配置和執(zhí)行重連操作。
示例用法如下所示:
registry.Register("/config/mysql", func(data) { // unmarshal data // reconnect mysql })
讓我們基于監(jiān)聽器模式簡(jiǎn)單設(shè)計(jì)一個(gè)模塊,實(shí)現(xiàn) ETCD 熱更新:
- 每個(gè)監(jiān)聽器可以訂閱特定的 key 或 key 前綴的更新事件。
- 使用
channel
通知配置變更,觸發(fā)對(duì)應(yīng)的監(jiān)聽回調(diào)。
這個(gè)示例,函數(shù)回調(diào)和輪詢其實(shí)已經(jīng)滿足需求,此處只是為了演示,而是否使用 channel 要具體分析。
我們這個(gè)設(shè)計(jì)要涉及到三個(gè)部分。分別是 Watcher、Listener 和 Registry。
- Watcher 責(zé)監(jiān)聽 ETCD 中的 key 變更事件。
- Listener 定義了當(dāng)特定 key 發(fā)生變化時(shí)需要執(zhí)行的回調(diào)邏輯。
- Registry 管理所有 Listener,將 ETCD 變更事件分發(fā)給對(duì)應(yīng) Listener。
先定義 Event
類型,一個(gè)簡(jiǎn)單的結(jié)構(gòu)體,表示 ETCD 中 key 的變更事件:
type Event struct { Key string Value string }
Listener
Listener
實(shí)現(xiàn)如下所示:
type Listener struct { EventChannel chan *Event Callback func(*Event) } func NewListner(callback func(*Event)) *Listener { l := &Listener{ EventChannel: make(chan *Event), Callback: callback, } return l } func (l *Listener) Start() { go func() { for event := range l.EventChannel { l.Callback(event) } }() }
基本之前的沒太大差別,從 EventChannel
中拿到事件消息,調(diào)用回調(diào)函數(shù)。
實(shí)現(xiàn) Registry
Registry
負(fù)責(zé)維護(hù) Listener
的注冊(cè),并在接收到 key 變更事件時(shí)通知相關(guān)的 Listener
:
type Registry struct { listeners map[string][]*Listener } func NewRegistry() *Registry { return &Registry{ listeners: make(map[string][]*Listener), } } func (r *Registry) Register(key string, listener *Listener) { r.listeners[key] = append(r.listeners[key], listener) listener.Start() } func (r *Registry) Notify(event *Event) { if listeners, ok := r.listeners[event.Key]; ok { for _, listener := range listeners { listener.EventChannel <- event } } }
注冊(cè) Listener
到 Registry
中,通過 map
將 key
與 Listener
關(guān)聯(lián)起來。
實(shí)現(xiàn) Watcher
Watcher
負(fù)責(zé)從 ETCD 訂閱 key 的變更事件,并將這些事件發(fā)送到 Registry
的 eventChannel
上:
func WatchEtcdKeys(client *clientv3.Client, registry *Registry, watchKeys ...string) { for _, key := range watchKeys { go func(key string) { watchChan := client.Watch(context.Background(), key, clientv3.WithPrefix()) for wresp := range watchChan { for _, ev := range wresp.Events { event := &Event{ Key: string(ev.Kv.Key), Value: string(ev.Kv.Value), } registry.Notify(event) } } }(key) } }
使用示例
讓我們實(shí)際在 main 函數(shù)上使用一下,觀察行為是否正常。
func main() { client, err := clientv3.New(clientv3.Config{ Endpoints: []string{"localhost:2379"}, }) if err != nil { log.Fatal(err) } defer client.Close() registry := NewRegistry() // 注冊(cè)監(jiān)聽器 registry.Register("/config/mysql", NewListener(func(event * Event) { fmt.Println(event) // 執(zhí)行數(shù)據(jù)重連之類的操作 })) // 開始監(jiān)聽 ETCD key 變更 WatchEtcdKeys(client, registry, "/config/") time.Sleep(10 * time.Minute) }
這個(gè)示例創(chuàng)建了一個(gè) ETCD 客戶端,初始化了一個(gè) Registry
,并為特定的 key 注冊(cè)了一個(gè) Listener
。然后,通過 WatchEtcdKeys
函數(shù)開始監(jiān)聽 /config/
前綴下的所有 key 的變更。
這種設(shè)計(jì)支持對(duì)特定 key 或 key 前綴的監(jiān)聽。當(dāng)相關(guān) key 變更時(shí),通過 channel
通知 Listener
,而收到更新事件后的具體操作。視場(chǎng)景而定,這里是執(zhí)行重連操作。
特別說明,示例僅作為概念驗(yàn)證,實(shí)際應(yīng)用中需要更多的錯(cuò)誤處理和優(yōu)化。
到此這篇關(guān)于詳解Go語言中的監(jiān)視器模式與配置熱更新的文章就介紹到這了,更多相關(guān)Go監(jiān)視器模式與配置熱更新內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang 實(shí)現(xiàn)struct、json、map互相轉(zhuǎn)化
這篇文章主要介紹了golang 實(shí)現(xiàn)struct、json、map互相轉(zhuǎn)化,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12深入學(xué)習(xí)Golang并發(fā)編程必備利器之sync.Cond類型
Go?語言的?sync?包提供了一系列同步原語,其中?sync.Cond?就是其中之一。本文將深入探討?sync.Cond?的實(shí)現(xiàn)原理和使用方法,幫助大家更好地理解和應(yīng)用?sync.Cond,需要的可以參考一下2023-05-05golang中單機(jī)鎖的具體實(shí)現(xiàn)詳解
這篇文章主要為大家詳細(xì)介紹了golang中單機(jī)鎖的具體實(shí)現(xiàn)的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-03-03Go語言leetcode題解953驗(yàn)證外星語詞典示例詳解
這篇文章主要為大家介紹了Go語言leetcode題解953驗(yàn)證外星語詞典示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12