Go語(yǔ)言實(shí)戰(zhàn)之實(shí)現(xiàn)一個(gè)簡(jiǎn)單分布式系統(tǒng)
引子
如今很多云原生系統(tǒng)、分布式系統(tǒng),例如 Kubernetes,都是用 Go 語(yǔ)言寫(xiě)的,這是因?yàn)?Go 語(yǔ)言天然支持異步編程,而且靜態(tài)語(yǔ)言能保證應(yīng)用系統(tǒng)的穩(wěn)定性。筆者的開(kāi)源項(xiàng)目 Crawlab 作為爬蟲(chóng)管理平臺(tái),也應(yīng)用到了分布式系統(tǒng)。本篇文章將介紹如何用 Go 語(yǔ)言編寫(xiě)一個(gè)簡(jiǎn)單的分布式系統(tǒng)。
思路
在開(kāi)始寫(xiě)代碼之前,我們先思考一下需要實(shí)現(xiàn)些什么。
- 主節(jié)點(diǎn)(Master Node):中控系統(tǒng),相當(dāng)于軍隊(duì)中的指揮官,派發(fā)任務(wù)命令
- 工作節(jié)點(diǎn)(Worker Node):執(zhí)行者,相當(dāng)于軍隊(duì)中的士兵,執(zhí)行任務(wù)
除了上面的概念以外,我們需要實(shí)現(xiàn)一些簡(jiǎn)單功能。
- 上報(bào)運(yùn)行狀態(tài)(Report Status):工作節(jié)點(diǎn)向主節(jié)點(diǎn)上報(bào)當(dāng)前狀態(tài)
- 分派任務(wù)(Assign Task):通過(guò) API 向主節(jié)點(diǎn)發(fā)起請(qǐng)求,主節(jié)點(diǎn)再向工作節(jié)點(diǎn)分派任務(wù)
- 運(yùn)行腳本(Execute Script):工作節(jié)點(diǎn)執(zhí)行任務(wù)中的腳本
整個(gè)流程示意圖如下。

實(shí)戰(zhàn)
節(jié)點(diǎn)通信
節(jié)點(diǎn)之間的通信在分布式系統(tǒng)中非常重要,畢竟每個(gè)節(jié)點(diǎn)或機(jī)器如果孤立運(yùn)行,就失去了分布式系統(tǒng)的意義。因此,節(jié)點(diǎn)通信在分布式系統(tǒng)中是核心模塊。
gRPC 協(xié)議
首先,我們來(lái)想一下,如何讓節(jié)點(diǎn)之間進(jìn)行相互通信。最常用的通信方式就是 API,不過(guò)這個(gè)通信方式有個(gè)缺點(diǎn),就是需要將各個(gè)節(jié)點(diǎn)的 IP 地址及端口顯示暴露給其他節(jié)點(diǎn),這在公網(wǎng)中是不太安全的。因此,我們選擇了 gRPC,一種流行的遠(yuǎn)程過(guò)程調(diào)用(Remote Procedure Call,RPC)框架。這里我們不過(guò)多的解釋 RPC 或 gRPC 的原理,簡(jiǎn)而言之,就是能讓調(diào)用者在遠(yuǎn)程機(jī)器上執(zhí)行命令的協(xié)議方式。
為了使用 gRPC 框架,我們先創(chuàng)建 go.mod 并輸入以下內(nèi)容,并執(zhí)行 go mod download。注意:對(duì)于國(guó)內(nèi)的朋友,或許需要添加代理才能正常下載,可以先執(zhí)行 export GOPROXY=goproxy.cn,direct 后再執(zhí)行下載命令。
module go-distributed-system ? go 1.17 ? require ( github.com/golang/protobuf v1.5.0 google.golang.org/grpc v1.27.0 google.golang.org/protobuf v1.27.1 )
然后,我們創(chuàng)建 Protocol Buffers 文件 node.proto(表示節(jié)點(diǎn)對(duì)應(yīng)的 gRPC 協(xié)議文件),并輸入以下內(nèi)容。
syntax = "proto3";
?
package core;
option go_package = ".;core";
?
message Request {
string action = 1;
}
?
message Response {
string data = 1;
}
?
service NodeService {
rpc ReportStatus(Request) returns (Response){}; // Simple RPC
rpc AssignTask(Request) returns (stream Response){}; // Server-Side RPC
}在這里我們創(chuàng)建了兩個(gè) RPC 服務(wù),分別是負(fù)責(zé)上報(bào)狀態(tài)的 Simple RPC ReportStatus 以及 Server-Side RPC AssignTask。Simple RPC 和 Server-Side RPC 的區(qū)別如下圖所示,主要區(qū)別在于 Server-Side RPC 可以從通過(guò)流(Stream)向客戶端(Client)主動(dòng)發(fā)送數(shù)據(jù),而 Simple RPC 只能從客戶端向服務(wù)端(Server)發(fā)請(qǐng)求。

創(chuàng)建好 .proto 文件后,我們需要將這個(gè) gRPC 協(xié)議文件轉(zhuǎn)化為 .go 代碼文件,從而能被 Go 程序引用。在命令行窗口中執(zhí)行如下命令。注意:編譯工具 protoc 不是自帶的,需要單獨(dú)下載,具體可以參考文檔 https://grpc.io/docs/protoc-installation/。
mkdir core
protoc --go_out=./core \
--go-grpc_out=./core \
node.proto
執(zhí)行完后,可以在 core 目錄下看到兩個(gè) Go 代碼文件, node.pb.go 和 node_grpc.pb.go,這相當(dāng)于 Go 程序中對(duì)應(yīng)的 gRPC 庫(kù)。
gRPC 服務(wù)端
現(xiàn)在開(kāi)始編寫(xiě)服務(wù)端邏輯。
咱們先創(chuàng)建一個(gè)新文件 core/node_service_server.go,輸入以下內(nèi)容。主要邏輯就是實(shí)現(xiàn)了之前創(chuàng)建好的 gRPC 協(xié)議中的兩個(gè)調(diào)用方法。其中,暴露了 CmdChannel 這個(gè)通道(Channel)來(lái)獲取需要發(fā)送到工作節(jié)點(diǎn)的命令。
package core
?
import (
"context"
)
?
type NodeServiceGrpcServer struct {
UnimplementedNodeServiceServer
?
// channel to receive command
CmdChannel chan string
}
?
func (n NodeServiceGrpcServer) ReportStatus(ctx context.Context, request *Request) (*Response, error) {
return &Response{Data: "ok"}, nil
}
?
func (n NodeServiceGrpcServer) AssignTask(request *Request, server NodeService_AssignTaskServer) error {
for {
select {
case cmd := <-n.CmdChannel:
// receive command and send to worker node (client)
if err := server.Send(&Response{Data: cmd}); err != nil {
return err
}
}
}
}
?
var server *NodeServiceGrpcServer
?
// GetNodeServiceGrpcServer singleton service
func GetNodeServiceGrpcServer() *NodeServiceGrpcServer {
if server == nil {
server = &NodeServiceGrpcServer{
CmdChannel: make(chan string),
}
}
return server
}gRPC 客戶端
gRPC 客戶端不需要具體實(shí)現(xiàn),我們通常只需要調(diào)用 gRPC 客戶端的方法,程序會(huì)自動(dòng)發(fā)起向服務(wù)端的請(qǐng)求以及獲取后續(xù)的響應(yīng)。
主節(jié)點(diǎn)
編寫(xiě)好了節(jié)點(diǎn)通信的基礎(chǔ)部分,現(xiàn)在我們需要實(shí)現(xiàn)主節(jié)點(diǎn)了,這是整個(gè)中心化分布式系統(tǒng)的核心。
咱們創(chuàng)建一個(gè)新的文件 node.go,輸入以下內(nèi)容。
package core
?
import (
"github.com/gin-gonic/gin"
"google.golang.org/grpc"
"net"
"net/http"
)
?
// MasterNode is the node instance
type MasterNode struct {
api *gin.Engine // api server
ln net.Listener // listener
svr *grpc.Server // grpc server
nodeSvr *NodeServiceGrpcServer // node service
}
?
func (n *MasterNode) Init() (err error) {
// TODO: implement me
panic("implement me")
}
?
func (n *MasterNode) Start() {
// TODO: implement me
panic("implement me")
}
?
var node *MasterNode
?
// GetMasterNode returns the node instance
func GetMasterNode() *MasterNode {
if node == nil {
// node
node = &MasterNode{}
?
// initialize node
if err := node.Init(); err != nil {
panic(err)
}
}
?
return node
}其中,我們創(chuàng)建了兩個(gè)占位方法 Init 和 Start,我們分別實(shí)現(xiàn)。
在初始化方法 Init 中,我們需要做幾件事情:
- 注冊(cè) gRPC 服務(wù)
- 注冊(cè) API 服務(wù)
現(xiàn)在,在 Init 方法中加入如下代碼。
func (n *MasterNode) Init() (err error) {
// grpc server listener with port as 50051
n.ln, err = net.Listen("tcp", ":50051")
if err != nil {
return err
}
?
// grpc server
n.svr = grpc.NewServer()
?
// node service
n.nodeSvr = GetNodeServiceGrpcServer()
?
// register node service to grpc server
RegisterNodeServiceServer(node.svr, n.nodeSvr)
?
// api
n.api = gin.Default()
n.api.POST("/tasks", func(c *gin.Context) {
// parse payload
var payload struct {
Cmd string `json:"cmd"`
}
if err := c.ShouldBindJSON(&payload); err != nil {
c.AbortWithStatus(http.StatusBadRequest)
return
}
?
// send command to node service
n.nodeSvr.CmdChannel <- payload.Cmd
?
c.AbortWithStatus(http.StatusOK)
})
?
return nil
}可以看到,我們新建了一個(gè) gRPC Server,并將之前的 NodeServiceGrpcServer 注冊(cè)了進(jìn)去。另外,我們還用 gin 框架創(chuàng)建了一個(gè)簡(jiǎn)單的 API 服務(wù),可以 POST 請(qǐng)求到 /tasks 向 NodeServiceGrpcServer 中的命令通道 CmdChannel 傳送命令。這樣就將各個(gè)部件串接起來(lái)了!
啟動(dòng)方法 Start 很簡(jiǎn)單,就是啟動(dòng) gRPC Server 以及 API Server。
func (n *MasterNode) Start() {
// start grpc server
go n.svr.Serve(n.ln)
?
// start api server
_ = n.api.Run(":9092")
?
// wait for exit
n.svr.Stop()
}下一步,我們就要實(shí)現(xiàn)實(shí)際做任務(wù)的工作節(jié)點(diǎn)了。
工作節(jié)點(diǎn)
現(xiàn)在,我們創(chuàng)建一個(gè)新文件 core/worker_node.go,輸入以下內(nèi)容。
package core
?
import (
"context"
"google.golang.org/grpc"
"os/exec"
)
?
type WorkerNode struct {
conn *grpc.ClientConn // grpc client connection
c NodeServiceClient // grpc client
}
?
func (n *WorkerNode) Init() (err error) {
// connect to master node
n.conn, err = grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
return err
}
?
// grpc client
n.c = NewNodeServiceClient(n.conn)
?
return nil
}
?
func (n *WorkerNode) Start() {
// log
fmt.Println("worker node started")
?
// report status
_, _ = n.c.ReportStatus(context.Background(), &Request{})
?
// assign task
stream, _ := n.c.AssignTask(context.Background(), &Request{})
for {
// receive command from master node
res, err := stream.Recv()
if err != nil {
return
}
?
// log command
fmt.Println("received command: ", res.Data)
?
// execute command
parts := strings.Split(res.Data, " ")
if err := exec.Command(parts[0], parts[1:]...).Run(); err != nil {
fmt.Println(err)
}
}
}
?
var workerNode *WorkerNode
?
func GetWorkerNode() *WorkerNode {
if workerNode == nil {
// node
workerNode = &WorkerNode{}
?
// initialize node
if err := workerNode.Init(); err != nil {
panic(err)
}
}
?
return workerNode
}其中,我們?cè)诔跏蓟椒?Init 中創(chuàng)建了gRPC 客戶端,并連接了主節(jié)點(diǎn)的 gRPC 服務(wù)端。
在啟動(dòng)方法 Start 中做了幾件事情:
- 調(diào)用上報(bào)狀態(tài)(Report Status)的 Simple RPC 方法
- 調(diào)用分配任務(wù)(Assign Task)的 Server-Side RPC 方法,獲取到了流(Stream)
- 通過(guò)循環(huán)不斷接受流傳輸過(guò)來(lái)的來(lái)自服務(wù)端(也就是主節(jié)點(diǎn))的信息,并執(zhí)行命令
這樣,整個(gè)包含主節(jié)點(diǎn)、工作節(jié)點(diǎn)的分布式系統(tǒng)核心邏輯就寫(xiě)好了!
將它們放在一起
最后,我們需要將這些核心邏輯用命令行工具封裝一下,以便啟用。
創(chuàng)建主程序文件 main.go,并輸入以下內(nèi)容。
package main
?
import (
"go-distributed-system/core"
"os"
)
?
func main() {
nodeType := os.Args[0]
switch nodeType {
case "master":
core.GetMasterNode().Start()
case "worker":
core.GetWorkerNode().Start()
default:
panic("invalid node type")
}
}這樣,整個(gè)簡(jiǎn)單的分布式系統(tǒng)就創(chuàng)建好了!
代碼效果
下面我們來(lái)運(yùn)行一下代碼。
打開(kāi)兩個(gè)命令行窗口,其中一個(gè)輸入 go run main.go master 啟動(dòng)主節(jié)點(diǎn),另一個(gè)輸入 go run main.go worker 啟動(dòng)工作節(jié)點(diǎn)。
如果主節(jié)點(diǎn)啟動(dòng)成功,將會(huì)看到如下日志信息。
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached. ? [GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production. - using env: export GIN_MODE=release - using code: gin.SetMode(gin.ReleaseMode) ? [GIN-debug] POST /tasks --> go-distributed-system/core.(*MasterNode).Init.func1 (3 handlers) [GIN-debug] [WARNING] You trusted all proxies, this is NOT safe. We recommend you to set a value. Please check https://pkg.go.dev/github.com/gin-gonic/gin#readme-don-t-trust-all-proxies for details. [GIN-debug] Listening and serving HTTP on :9092
如果工作節(jié)點(diǎn)啟動(dòng)成功,將會(huì)看到如下日志信息。
worker node started
主節(jié)點(diǎn)、工作節(jié)點(diǎn)都啟動(dòng)成功后,我們?cè)诹硗庖粋€(gè)命令行窗口中輸入如下命令來(lái)發(fā)起 API 請(qǐng)求。
curl -X POST \
-H "Content-Type: application/json" \
-d '{"cmd": "touch /tmp/hello-distributed-system"}' \
http://localhost:9092/tasks
在工作節(jié)點(diǎn)窗口應(yīng)該可以看到日志 received command: touch /tmp/hello-distributed-system。
然后查看文件是否順利生成,執(zhí)行 ls -l /tmp/hello-distributed-system。
-rw-r--r-- 1 marvzhang wheel 0B Oct 26 12:22 /tmp/hello-distributed-system
文件成功生成,表示已經(jīng)通過(guò)工作節(jié)點(diǎn)執(zhí)行成功了!大功告成!
總結(jié)
本篇文章通過(guò) RPC 框架 gRPC 以及 Go 語(yǔ)言自帶的 Channel,將節(jié)點(diǎn)串接起來(lái),開(kāi)發(fā)出了一個(gè)簡(jiǎn)單的分布式系統(tǒng)。所用到的核心庫(kù)和技術(shù):
整個(gè)代碼示例倉(cāng)庫(kù)在 GitHub 上: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system
到此這篇關(guān)于Go語(yǔ)言實(shí)戰(zhàn)之實(shí)現(xiàn)一個(gè)簡(jiǎn)單分布式系統(tǒng)的文章就介紹到這了,更多相關(guān)Go語(yǔ)言分布式系統(tǒng)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang 操作TSV文件的實(shí)戰(zhàn)示例
本文主要介紹了Golang 操作TSV文件的實(shí)戰(zhàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03
一文帶你了解Go語(yǔ)言fmt標(biāo)準(zhǔn)庫(kù)輸出函數(shù)的使用
這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言中 fmt 標(biāo)準(zhǔn)庫(kù)輸出函數(shù)的使用,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下2022-12-12
golang中strconv.ParseInt函數(shù)用法示例
這篇文章主要介紹了golang中strconv.ParseInt函數(shù)用法,實(shí)例分析了strconv.ParseInt函數(shù)將字符串轉(zhuǎn)換為數(shù)字的簡(jiǎn)單使用方法,需要的朋友可以參考下2016-07-07
詳解Golang如何優(yōu)雅接入多個(gè)遠(yuǎn)程配置中心
這篇文章主要為大家為大家介紹了Golang如何優(yōu)雅接入多個(gè)遠(yuǎn)程配置中心詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05

