欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何通過memberlist庫實(shí)現(xiàn)gossip管理集群及集群數(shù)據(jù)交互問題

 更新時間:2022年07月12日 09:49:00   作者:charlieroro  
這篇文章主要介紹了通過memberlist庫實(shí)現(xiàn)gossip管理集群以及集群數(shù)據(jù)交互,本文介紹了memberlist庫的簡單用法,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下

通過memberlist庫實(shí)現(xiàn)gossip管理集群以及集群數(shù)據(jù)交互

概述

memberlist庫的簡單用法如下,注意下面使用for循環(huán)來執(zhí)行 list.Join ,原因是一開始各節(jié)點(diǎn)都沒有runing,直接執(zhí)行 Join 會出現(xiàn)連接拒絕的錯誤。

package main
import (
	"fmt"
	"github.com/hashicorp/memberlist"
	"time"
)
func main() {
	/* Create the initial memberlist from a safe configuration.
	   Please reference the godoc for other default config types.
	   http://godoc.org/github.com/hashicorp/memberlist#Config
	*/
	list, err := memberlist.Create(memberlist.DefaultLocalConfig())
	if err != nil {
		panic("Failed to create memberlist: " + err.Error())
	}
	t := time.NewTicker(time.Second * 5)
	for {
		select {
		case <-t.C:
			// Join an existing cluster by specifying at least one known member.
			n, err := list.Join([]string{"192.168.80.129"})
			if err != nil {
				fmt.Println("Failed to join cluster: " + err.Error())
				continue
			}
			fmt.Println("member number is:", n)
			goto END
		}
	}
END:
	for {
		select {
		case <-t.C:
			// Ask for members of the cluster
			for _, member := range list.Members() {
				fmt.Printf("Member: %s %s\n", member.Name, member.Addr)
			}
		}
	}
	// Continue doing whatever you need, memberlist will maintain membership
	// information in the background. Delegates can be used for receiving
	// events when members join or leave.
}

memberlist的兩個主要接口如下:

  • Create:根據(jù)入?yún)⑴渲脛?chuàng)建一個 Memberlist ,初始化階段 Memberlist 僅包含本節(jié)點(diǎn)狀態(tài)。注意此時并不會連接到其他節(jié)點(diǎn),執(zhí)行成功之后就可以允許其他節(jié)點(diǎn)加入該memberlist。

  • Join:使用已有的 Memberlist 來嘗試連接給定的主機(jī),并與之同步狀態(tài),以此來加入某個cluster。執(zhí)行該操作可以讓其他節(jié)點(diǎn)了解到本節(jié)點(diǎn)的存在。最后返回成功建立連接的節(jié)點(diǎn)數(shù)以及錯誤信息,如果沒有與任何節(jié)點(diǎn)建立連接,則返回錯誤。

    注意當(dāng)join一個cluster時,至少需要指定集群中的一個已知成員,后續(xù)會通過gossip同步整個集群的成員信息。

memberlist提供的功能主要分為兩塊:維護(hù)成員狀態(tài)(gossip)以及數(shù)據(jù)同步(boardcast、SendReliable)。下面看幾個相關(guān)接口。

接口

memberlist.Create 的入?yún)⒁蠼o出相應(yīng)的 配置 信息, DefaultLocalConfig() 給出了通用的配置信息,但還需要實(shí)現(xiàn)相關(guān)接口來實(shí)現(xiàn)成員狀態(tài)的同步以及用戶數(shù)據(jù)的收發(fā)。注意下面有些接口是必選的,有些則可選:

type Config struct {
	// ...
	// Delegate and Events are delegates for receiving and providing
	// data to memberlist via callback mechanisms. For Delegate, see
	// the Delegate interface. For Events, see the EventDelegate interface.
	//
	// The DelegateProtocolMin/Max are used to guarantee protocol-compatibility
	// for any custom messages that the delegate might do (broadcasts,
	// local/remote state, etc.). If you don't set these, then the protocol
	// versions will just be zero, and version compliance won't be done.
	Delegate                Delegate
	Events                  EventDelegate
	Conflict                ConflictDelegate
	Merge                   MergeDelegate
	Ping                    PingDelegate
	Alive                   AliveDelegate
	//...
}

memberlist使用如下 類型 的消息來同步集群狀態(tài)和處理用戶消息:

const (
	pingMsg messageType = iota
	indirectPingMsg
	ackRespMsg
	suspectMsg
	aliveMsg
	deadMsg
	pushPullMsg
	compoundMsg
	userMsg // User mesg, not handled by us
	compressMsg
	encryptMsg
	nackRespMsg
	hasCrcMsg
	errMsg
)

Delegate

如果要使用memberlist的gossip協(xié)議,則必須實(shí)現(xiàn)該接口。所有這些方法都必須是線程安全的。

type Delegate interface {
	// NodeMeta is used to retrieve meta-data about the current node
	// when broadcasting an alive message. It's length is limited to
	// the given byte size. This metadata is available in the Node structure.
	NodeMeta(limit int) []byte

	// NotifyMsg is called when a user-data message is received.
	// Care should be taken that this method does not block, since doing
	// so would block the entire UDP packet receive loop. Additionally, the byte
	// slice may be modified after the call returns, so it should be copied if needed
	NotifyMsg([]byte)

	// GetBroadcasts is called when user data messages can be broadcast.
	// It can return a list of buffers to send. Each buffer should assume an
	// overhead as provided with a limit on the total byte size allowed.
	// The total byte size of the resulting data to send must not exceed
	// the limit. Care should be taken that this method does not block,
	// since doing so would block the entire UDP packet receive loop.
	GetBroadcasts(overhead, limit int) [][]byte

	// LocalState is used for a TCP Push/Pull. This is sent to
	// the remote side in addition to the membership information. Any
	// data can be sent here. See MergeRemoteState as well. The `join`
	// boolean indicates this is for a join instead of a push/pull.
	LocalState(join bool) []byte

	// MergeRemoteState is invoked after a TCP Push/Pull. This is the
	// state received from the remote side and is the result of the
	// remote side's LocalState call. The 'join'
	// boolean indicates this is for a join instead of a push/pull.
	MergeRemoteState(buf []byte, join bool)
}

主要方法如下:

  • NotifyMsg:用于接收用戶消息( userMsg )。注意不能阻塞該方法,否則會阻塞整個UDP/TCP報(bào)文接收循環(huán)。此外由于數(shù)據(jù)可能在方法調(diào)用時被修改,因此應(yīng)該事先拷貝數(shù)據(jù)。

    該方法用于接收通過UDP/TCP方式發(fā)送的用戶消息( userMsg ):

    注意UDP方式并不是立即發(fā)送的,它會隨gossip周期性發(fā)送或在處理 pingMsg 等消息時發(fā)送從GetBroadcasts獲取到的用戶消息。

    //使用UDP方式將用戶消息傳輸?shù)浇o定節(jié)點(diǎn),消息大小受限于memberlist的UDPBufferSize配置。沒有使用gossip機(jī)制
    func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error
    //與SendBestEffort機(jī)制相同,只不過一個指定了Node,一個指定了Node地址
    func (m *Memberlist) SendToAddress(a Address, msg []byte) error
    //使用TCP方式將用戶消息傳輸?shù)浇o定節(jié)點(diǎn),消息沒有大小限制。沒有使用gossip機(jī)制
    func (m *Memberlist) SendReliable(to *Node, msg []byte) error
  • GetBroadcasts:用于在gossip周期性調(diào)度或處理處理 pingMsg 等消息時攜帶用戶消息,因此并不是即時的。通常會把需要發(fā)送的消息通過 TransmitLimitedQueue.QueueBroadcast 保存起來,然后在發(fā)送時通過 TransmitLimitedQueue.GetBroadcasts 獲取需要發(fā)送的消息。見下面 TransmitLimitedQueue 的描述。

  • LocalState:用于TCP Push/Pull,用于向遠(yuǎn)端發(fā)送除成員之外的信息(可以發(fā)送任意數(shù)據(jù)),用于定期同步成員狀態(tài)。參數(shù) join 用于表示將該方法用于join階段,而非push/pull。

  • MergeRemoteState:TCP Push/Pull之后調(diào)用,接收到遠(yuǎn)端的狀態(tài)(即遠(yuǎn)端調(diào)用LocalState的結(jié)果)。參數(shù) join 用于表示將該方法用于join階段,而非push/pull。

定期(PushPullInterval)調(diào)用pushPull來隨機(jī)執(zhí)行一次完整的狀態(tài)交互。但由于pushPull會與其他節(jié)點(diǎn)同步本節(jié)點(diǎn)的所有狀態(tài),因此代價也比較大。

EventDelegate

僅用于接收成員的joining 和leaving通知,可以用于更新本地的成員狀態(tài)信息。

type EventDelegate interface {
	// NotifyJoin is invoked when a node is detected to have joined.
	// The Node argument must not be modified.
	NotifyJoin(*Node)

	// NotifyLeave is invoked when a node is detected to have left.
	// The Node argument must not be modified.
	NotifyLeave(*Node)

	// NotifyUpdate is invoked when a node is detected to have
	// updated, usually involving the meta data. The Node argument
	// must not be modified.
	NotifyUpdate(*Node)
}

ChannelEventDelegate 實(shí)現(xiàn)了簡單的 EventDelegate 接口:

type ChannelEventDelegate struct {
  Ch chan<- NodeEvent
}

ConflictDelegate

用于通知某個client在執(zhí)行join時產(chǎn)生了命名沖突。通常是因?yàn)閮蓚€client配置了相同的名稱,但使用了不同的地址??梢杂糜诮y(tǒng)計(jì)錯誤信息。

type ConflictDelegate interface {
	// NotifyConflict is invoked when a name conflict is detected
	NotifyConflict(existing, other *Node)
}

MergeDelegate

在集群執(zhí)行merge操作時調(diào)用。 NotifyMerge 方法的參數(shù) peers 提供了對端成員信息。 可以不實(shí)現(xiàn)該接口。

type MergeDelegate interface {
	// NotifyMerge is invoked when a merge could take place.
	// Provides a list of the nodes known by the peer. If
	// the return value is non-nil, the merge is canceled.
	NotifyMerge(peers []*Node) error
}

PingDelegate

用于通知觀察者完成一個ping消息( pingMsg )要花費(fèi)多長時間??梢栽?nbsp;NotifyPingComplete 中(使用histogram)統(tǒng)計(jì)ping的執(zhí)行時間。

type PingDelegate interface {
	// AckPayload is invoked when an ack is being sent; the returned bytes will be appended to the ack
	AckPayload() []byte
	// NotifyPing is invoked when an ack for a ping is received
	NotifyPingComplete(other *Node, rtt time.Duration, payload []byte)
}

AliveDelegate

當(dāng)接收到 aliveMsg 消息時調(diào)用的接口,可以用于添加日志和指標(biāo)等信息。

type AliveDelegate interface {
	// NotifyAlive is invoked when a message about a live
	// node is received from the network.  Returning a non-nil
	// error prevents the node from being considered a peer.
	NotifyAlive(peer *Node) error
}

Broadcast

可以隨gossip將數(shù)據(jù)廣播到memberlist集群。

// Broadcast is something that can be broadcasted via gossip to
// the memberlist cluster.
type Broadcast interface {
	// Invalidates checks if enqueuing the current broadcast
	// invalidates a previous broadcast
	Invalidates(b Broadcast) bool

	// Returns a byte form of the message
	Message() []byte

	// Finished is invoked when the message will no longer
	// be broadcast, either due to invalidation or to the
	// transmit limit being reached
	Finished()
}

Broadcast 接口通常作為 TransmitLimitedQueue.QueueBroadcast 的入?yún)ⅲ?/p>

func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
	q.queueBroadcast(b, 0)
}

alertmanager中的實(shí)現(xiàn)如下:

type simpleBroadcast []byte

func (b simpleBroadcast) Message() []byte                       { return []byte(b) }
func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false }
func (b simpleBroadcast) Finished()

TransmitLimitedQueue

TransmitLimitedQueue主要用于處理廣播消息。有兩個主要的方法: QueueBroadcast 和 GetBroadcasts ,前者用于保存廣播消息,后者用于在發(fā)送的時候獲取需要廣播的消息。隨gossip周期性調(diào)度或在處理 pingMsg 等消息時調(diào)用 GetBroadcasts 方法。

// TransmitLimitedQueue is used to queue messages to broadcast to
// the cluster (via gossip) but limits the number of transmits per
// message. It also prioritizes messages with lower transmit counts
// (hence newer messages).
type TransmitLimitedQueue struct {
	// NumNodes returns the number of nodes in the cluster. This is
	// used to determine the retransmit count, which is calculated
	// based on the log of this.
	NumNodes func() int

	// RetransmitMult is the multiplier used to determine the maximum
	// number of retransmissions attempted.
	RetransmitMult int

	mu    sync.Mutex
	tq    *btree.BTree // stores *limitedBroadcast as btree.Item
	tm    map[string]*limitedBroadcast
	idGen int64
}

小結(jié)

memberlist中的消息分為兩種,一種是內(nèi)部用于同步集群狀態(tài)的消息,另一種是用戶消息。

GossipInterval 周期性調(diào)度的有兩個方法:

  • gossip :用于同步 aliveMsg 、 deadMsg 、 suspectMsg 消息
  • probe :用于使用 pingMsg 消息探測節(jié)點(diǎn)狀態(tài)
// GossipInterval and GossipNodes are used to configure the gossip
	// behavior of memberlist.
	//
	// GossipInterval is the interval between sending messages that need
	// to be gossiped that haven't been able to piggyback on probing messages.
	// If this is set to zero, non-piggyback gossip is disabled. By lowering
	// this value (more frequent) gossip messages are propagated across
	// the cluster more quickly at the expense of increased bandwidth.
	//
	// GossipNodes is the number of random nodes to send gossip messages to
	// per GossipInterval. Increasing this number causes the gossip messages
	// to propagate across the cluster more quickly at the expense of
	// increased bandwidth.
	//
	// GossipToTheDeadTime is the interval after which a node has died that
	// we will still try to gossip to it. This gives it a chance to refute.
	GossipInterval      time.Duration
	GossipNodes         int
	GossipToTheDeadTime time.Duration

用戶消息又分為兩種:

  • 周期性同步:
    • 以 PushPullInterval 為周期,使用 Delegate.LocalState 和 Delegate.MergeRemoteState 以TCP方式同步用戶信息;
    • 使用 Delegate.GetBroadcasts 隨gossip發(fā)送用戶信息。
  • 主動發(fā)送:使用 SendReliable 等方法實(shí)現(xiàn)主動發(fā)送用戶消息。

alertmanager的處理

alertmanager通過兩種方式發(fā)送用戶消息,即UDP方式和TCP方式。在alertmanager中,當(dāng)要發(fā)送的數(shù)據(jù)大于 MaxGossipPacketSize/2 將采用TCP方式( SendReliable 方法),否則使用UDP方式( Broadcast 接口)。

func (c *Channel) Broadcast(b []byte) {
	b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
	if err != nil {
		return
	}

	if OversizedMessage(b) {
		select {
		case c.msgc <- b: //從c.msgc 接收數(shù)據(jù),并使用SendReliable發(fā)送
		default:
			level.Debug(c.logger).Log("msg", "oversized gossip channel full")
			c.oversizeGossipMessageDroppedTotal.Inc()
		}
	} else {
		c.send(b)
	}
}

func OversizedMessage(b []byte) bool {
	return len(b) > MaxGossipPacketSize/2
}

demo

這里 實(shí)現(xiàn)了一個簡單的基于gossip管理集群信息,并通過TCP給集群成員發(fā)送信息的例子。

到此這篇關(guān)于通過memberlist庫實(shí)現(xiàn)gossip管理集群以及集群數(shù)據(jù)交互的文章就介紹到這了,更多相關(guān)memberlist庫gossip集群內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • chatGPT使用及注冊過程中常見的一些錯誤解決方法(所有報(bào)錯匯總)

    chatGPT使用及注冊過程中常見的一些錯誤解決方法(所有報(bào)錯匯總)

    這篇文章主要介紹了chatGPT注冊報(bào)錯及使用過程中報(bào)錯匯總及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-02-02
  • 301重定向代碼合集(iis,asp,php,asp.net,apache)

    301重定向代碼合集(iis,asp,php,asp.net,apache)

    腳本之家將SEO工作中所需要的301轉(zhuǎn)向代碼進(jìn)行了整理,收藏并分享,以備查閱。
    2011-02-02
  • Git下載、安裝與環(huán)境配置的詳細(xì)教程

    Git下載、安裝與環(huán)境配置的詳細(xì)教程

    這篇文章主要介紹了Git下載、安裝與環(huán)境配置,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • 如何正確使用開源項(xiàng)目?

    如何正確使用開源項(xiàng)目?

    開源項(xiàng)目廣受大家喜愛,其實(shí)我們在使用開源項(xiàng)目的過程中有不少注意的事項(xiàng),今天就來給大家介紹下如何正確的使用開源項(xiàng)目。需要的朋友可以參考一下
    2018-09-09
  • 在PB中如何讓用戶只能修改新增的數(shù)據(jù)

    在PB中如何讓用戶只能修改新增的數(shù)據(jù)

    一些數(shù)據(jù)庫系統(tǒng)或者某些數(shù)據(jù)表只允許用戶添加數(shù)據(jù),而不能修改或者刪除以往的記錄,此時我們就必須在程序進(jìn)行控制。有些程序員通過不顯示以往的數(shù)據(jù)來保證,下面我們介紹一種既可以看到原始記錄,有不容許用戶修改這些記錄的方法
    2008-11-11
  • 對Web開發(fā)人員有用的8個網(wǎng)站小結(jié)

    對Web開發(fā)人員有用的8個網(wǎng)站小結(jié)

    本文是由比利時的Web開發(fā)人員Jean-Baptiste Jung分享的,Jung還在《Web開發(fā)/設(shè)計(jì)人員應(yīng)當(dāng)知道的15個網(wǎng)站》這篇文章中推薦了15個相關(guān)網(wǎng)站
    2011-05-05
  • 微信 小程序Demo導(dǎo)入詳細(xì)介紹

    微信 小程序Demo導(dǎo)入詳細(xì)介紹

    這篇文章主要介紹了微信 小程序Demo導(dǎo)入詳細(xì)介紹的相關(guān)資料,需要的朋友可以參考下
    2016-09-09
  • Git?Commitizen提交規(guī)范化自動生成changelog文件

    Git?Commitizen提交規(guī)范化自動生成changelog文件

    這篇文章主要為大家介紹了Git?Commitizen提交規(guī)范化自動生成changelog文件詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-09-09
  • HTTP長連接與短連接使用方法及測試詳解

    HTTP長連接與短連接使用方法及測試詳解

    這篇文章主要介紹了HTTP長連接與短連接使用方法及測試,需要的朋友可以參考下
    2020-02-02
  • windows下Git安裝教程(圖文)

    windows下Git安裝教程(圖文)

    這篇文章主要介紹了windows下Git安裝教程(圖文),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07

最新評論