Go語言結(jié)合grpc和protobuf實現(xiàn)去中心化的聊天室
介紹
傳統(tǒng)的聊天室主要是基于c/s架構(gòu),需要有一個服務(wù)端完成各個客戶端的聊天轉(zhuǎn)發(fā)。今天我們使用golang+grpc+protobuf,設(shè)計一個去中心化、局域網(wǎng)自發(fā)現(xiàn)的聊天客戶端。
完整代碼地址在 github.com/AlpsMonaco/proximity-chat
模塊
協(xié)議
我們先定義proto消息格式 message/message.proto
syntax = "proto3";
option go_package = "proximity-chat/message";
package message;
service Chat {
rpc NewNode (stream NodeRequest) returns (stream NodeReply){ }
}
message NodeRequest {
string msg = 1;
}
message NodeReply {
string msg = 1;
}
聊天軟件一般需要全雙工保證時效性,所以這邊使用了 stream NodeRequest 和 stream NodeReply。 這邊消息只有兩個,請求和回復(fù)直接透傳string就行。
執(zhí)行
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative message\message.proto
會在相同目錄下生成相關(guān)的go代碼文件。在文件 message_grpc.pb.go 中會包含rpc的interface
type ChatServer interface {
NewNode(Chat_NewNodeServer) error
mustEmbedUnimplementedChatServer()
}
我們需要實現(xiàn)這個接口中的 NewNode 服務(wù)。
交互
在 service/message.go 中實現(xiàn) NewNode(Chat_NewNodeServer) error
type MessageWriter interface {
Write(string)
}
type Message struct {
Writer MessageWriter
message.UnimplementedChatServer
}
...
func (m *Message) NewNode(ss message.Chat_NewNodeServer) error {
head, err := ss.Recv()
if err != nil {
m.Writer.Write(fmt.Sprint(err))
return err
}
addr := head.GetMsg()
if controller.IsChatNodeExist(addr) {
return nil
}
if !controller.AddChatNode(&ServerChatNode{s: ss}, addr) {
return nil
}
err = ss.Send(&message.NodeReply{Msg: "ok"})
if err != nil {
return err
}
m.Writer.Write("new node " + addr + " has joined")
for {
msg, err := ss.Recv()
if err != nil {
controller.RemoveNode(addr)
fmt.Println(err)
return err
}
m.Writer.Write(msg.GetMsg())
}
}由于是去中心化,所以沒有客戶端服務(wù)端的概念,我們將它稱為一個節(jié)點 node。在同一個局域網(wǎng)內(nèi),node監(jiān)聽的ip+port做唯一key,用于避免重復(fù)進入聊天室。
上面的代碼中 controller 模塊主要是用來控制和管理斷點的,后續(xù)會講。
整體流程是先接收其他node發(fā)來的 ip+port ,判斷是否已經(jīng)加入過這個端點,如果沒加入過就用controller綁定節(jié)點,進行后續(xù)的聊天請求,否則中止交互。
控制
在 controller/node.go ,我們使用map和讀寫鎖來維護node的唯一性。
package controller
import (
"sync"
)
type ChatNode interface {
SendChatMsg(string) error
RecvChatMsg() (string, error)
}
var nodeMap map[string]ChatNode = make(map[string]ChatNode)
var nodeMapLock sync.RWMutex
func AddChatNode(node ChatNode, addr string) bool {
nodeMapLock.Lock()
defer nodeMapLock.Unlock()
_, ok := nodeMap[addr]
if !ok {
nodeMap[addr] = node
return true
}
return false
}
func RemoveNode(addr string) {
nodeMapLock.Lock()
defer nodeMapLock.Unlock()
delete(nodeMap, addr)
}
func IsChatNodeExist(addr string) bool {
nodeMapLock.RLock()
defer nodeMapLock.RUnlock()
_, ok := nodeMap[addr]
return ok
}
func Publish(s string) {
nodeMapLock.RLock()
defer nodeMapLock.RUnlock()
for _, n := range nodeMap {
n.SendChatMsg(s)
}
}發(fā)現(xiàn)
discover/discover.go 下定義如何發(fā)現(xiàn)相同網(wǎng)段上的其他服務(wù)。
這邊使用 ipnetgen 庫來獲取相同網(wǎng)段下的所有IP。定期去遍歷其他網(wǎng)段上的相同服務(wù)。 將自己的監(jiān)聽ip+端口發(fā)送給其他node,若返回'ok'則建立通訊。
func BeginDiscoverService() {
minPort := config.GetConfig().GetMinPort()
maxPort := config.GetConfig().GetMaxPort()
if minPort > maxPort {
minPort = maxPort
}
for {
time.Sleep(time.Second)
gen, err := ipnetgen.New(config.GetConfig().GetCIDR())
if err != nil {
panic(err)
}
for ip := gen.Next(); ip != nil; ip = gen.Next() {
for i := minPort; i <= maxPort; i++ {
addr := fmt.Sprintf("%s:%d", ip.String(), i)
if addr == GetAddr() {
continue
}
if controller.IsChatNodeExist(addr) {
continue
}
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
fmt.Printf("did not connect: %v\n", err)
continue
}
client := message.NewChatClient(conn)
cli, err := client.NewNode(context.Background())
if err != nil {
continue
}
err = cli.Send(&message.NodeRequest{Msg: GetAddr()})
if err != nil {
writer.Write(fmt.Sprint(err))
continue
}
resp, err := cli.Recv()
if err != nil {
cli.CloseSend()
writer.Write(fmt.Sprint(err))
continue
}
if resp.GetMsg() != "ok" {
cli.CloseSend()
continue
}
if !controller.AddChatNode(&service.ClientChatNode{C: cli}, addr) {
cli.CloseSend()
continue
}
writer.Write("discover " + addr)
go func() {
for {
msg, err := cli.Recv()
if err != nil {
writer.Write(fmt.Sprint(err))
controller.RemoveNode(addr)
return
}
writer.Write(msg.GetMsg())
}
}()
}
}
}
}配置
我們定義配置的獲取方式,配置文件格式為json,定義配置獲取的方式 config.go 。
package config
type NetworkConfig struct {
CIDR string `json:"cidr"`
MaxPort int `json:"max_port"`
MinPort int `json:"min_port"`
}
func DefaultNetworkConfig() *NetworkConfig {
return &NetworkConfig{
"127.0.0.1/32", 4569, 4565,
}
}
type ConstNetworkConfig struct {
c *NetworkConfig
}
func (c *ConstNetworkConfig) GetCIDR() string { return c.c.CIDR }
func (c *ConstNetworkConfig) GetMaxPort() int { return c.c.MaxPort }
func (c *ConstNetworkConfig) GetMinPort() int { return c.c.MinPort }
var config = &ConstNetworkConfig{DefaultNetworkConfig()}
func GetConfig() *ConstNetworkConfig { return config }
func SetConfig(nc *NetworkConfig) { config = &ConstNetworkConfig{nc} }這邊最主要定義三個字段,內(nèi)網(wǎng)的ip網(wǎng)段,服務(wù)的最小到最大的端口范圍。這個配置主要用于搜尋同網(wǎng)段同端口上的相同服務(wù)。為了方便調(diào)試我們加一個 DefaultNetworkConfig(),監(jiān)聽127.0.0.1上的4565~4569。 同時還加了一個 ConstNetworkConfig 類,供其他模塊訪問全局配置,同時保護配置不被修改。
運行實例
編譯后直接運行,會在指定的端口范圍內(nèi)嘗試監(jiān)聽,無需指定端口。主線程中scanf阻塞獲取輸入。我們直接打開三個進程,在一個終端中輸入數(shù)據(jù)發(fā)送,其他兩個終端都能獲取聊天數(shù)據(jù)。

以上就是Go語言結(jié)合grpc和protobuf實現(xiàn)去中心化的聊天室的詳細內(nèi)容,更多關(guān)于Go聊天室的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang官方限流器time/rate的使用與實現(xiàn)詳解
限流器是后臺服務(wù)中十分重要的組件,在實際的業(yè)務(wù)場景中使用居多。time/rate?包基于令牌桶算法實現(xiàn)限流,本文主要為大家介紹了time/rate的使用與實現(xiàn),需要的可以參考一下2023-04-04
gin框架Context如何獲取Get?Query?Param函數(shù)數(shù)據(jù)
這篇文章主要為大家介紹了gin框架Context?Get?Query?Param函數(shù)獲取數(shù)據(jù),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-03-03

