Golang實(shí)現(xiàn)簡易的rpc調(diào)用
RPC(Remote Procedure Call Protocol)遠(yuǎn)程過程調(diào)用協(xié)議。 一個(gè)通俗的描述是:客戶端在不知道調(diào)用細(xì)節(jié)的情況下,調(diào)用存在于遠(yuǎn)程計(jì)算機(jī)上的某個(gè)對象,就像調(diào)用本地應(yīng)用程序中的對象一樣。 比較正式的描述是:一種通過網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)程序上請求服務(wù),而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議 從使用的方面來說,服務(wù)端和客戶端通過TCP/UDP/HTTP等通訊協(xié)議通訊,在通訊的時(shí)候客戶端指定好服務(wù)端的方法、參數(shù)等信息通過序列化傳送到服務(wù)端,服務(wù)端可以通過已有的元信息找到需要調(diào)用的方法,然后完成一次調(diào)用后序列化返回給客戶端(rpc更多的是指服務(wù)與服務(wù)之間的通信,可以使用效率更高的協(xié)議和序列化格式去進(jìn)行,并且可以進(jìn)行有效的負(fù)載均衡和熔斷超時(shí)等,因此跟前后端之間的web的交互概念上是有點(diǎn)不一樣的) 用一張簡單的圖來表示

開始
本文只實(shí)現(xiàn)一個(gè)rpc框架基本的功能,不對性能做保證,因此盡量使用go原生自帶的net/json庫等進(jìn)行操作,對使用方面不做stub(偷懶,只使用簡單的json格式指定需要調(diào)用的方法),用最簡單的方式實(shí)現(xiàn)一個(gè)簡易rpc框架,也不保證超時(shí)調(diào)用和服務(wù)發(fā)現(xiàn)等集成的邏輯,服務(wù)發(fā)現(xiàn)可以參考下文 本文代碼地址(https://github.com/wuhuZhao/rpc_demo)
實(shí)現(xiàn)兩點(diǎn)之間的通訊(transport)
本段先實(shí)現(xiàn)兩端之間的通訊,只確保兩個(gè)端之間能互相通訊即可 server.go
package server
import (
"fmt"
"log"
"net"
)
// Server: transport底層實(shí)現(xiàn),通過Server去接受客戶端的字節(jié)流
type Server struct {
ls net.Listener
port int
}
// NewServer: 根據(jù)端口創(chuàng)建一個(gè)server
func NewServer(port int) *Server {
s := &Server{port: port}
s.init()
return s
}
// init: 初始化服務(wù)端連接
func (s *Server) init() {
l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.port))
if err != nil {
panic(err)
}
s.ls = l
}
// Start: 啟動服務(wù)端的端口監(jiān)聽,采取一個(gè)conn一個(gè)g的模型,沒有使用reactor等高性能模型
func (s *Server) Start() {
go func() {
log.Printf("server [%s] start....", s.ls.Addr().String())
for {
conn, err := s.ls.Accept()
if err != nil {
panic(err)
}
go func() {
buf := make([]byte, 1024)
for {
idx, err := conn.Read(buf)
if err != nil {
panic(err)
}
if len(buf) == 0 {
continue
}
// todo 等序列化的信息
log.Printf("[conn: %v] get data: %v\n", conn.RemoteAddr(), string(buf[:idx]))
}
}()
}
}()
}
// Close: 關(guān)閉服務(wù)監(jiān)聽
func (s *Server) Close() error {
return s.ls.Close()
}
// Close: 關(guān)閉服務(wù)監(jiān)聽
func (s *Server) Close() error {
return s.ls.Close()
}client.go
package client
import (
"fmt"
"log"
"net"
"unsafe"
)
type Client struct {
port int
conn net.Conn
}
func NewClient(port int) *Client {
c := &Client{port: port}
c.init()
return c
}
// init: initialize tcp client
func (c *Client) init() {
conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", c.port))
if err != nil {
panic(err)
}
c.conn = conn
}
func (c *Client) Send(statement string) error {
_, err := c.conn.Write(*(*[]byte)(unsafe.Pointer(&statement)))
if err != nil {
panic(err)
}
return nil
}
// Close: use to close connection
func (c *Client) Close() error {
return c.conn.Close()
}
使用main.go做測試 main.go
package main
import (
"rpc_demo/internal/client"
"rpc_demo/internal/server"
"time"
)
func main() {
s := server.NewServer(9999)
s.Start()
time.Sleep(5 * time.Second)
c := client.NewClient(9999)
c.Send("this is a test\n")
time.Sleep(5 * time.Second)
}執(zhí)行一次main.go, go run main.go
2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test
可以證明第一部分的任務(wù)已經(jīng)完成,可以實(shí)現(xiàn)兩端之間的通訊了
實(shí)現(xiàn)反射調(diào)用已注冊的方法
實(shí)現(xiàn)了雙端的通信以后,我們在internal.go里實(shí)現(xiàn)兩個(gè)方法,一個(gè)是注冊,一個(gè)是調(diào)用,因?yàn)間o有運(yùn)行時(shí)的反射,所以我們使用反射去注冊每一個(gè)需要調(diào)用到的方法,然后提供全局唯一的函數(shù)名,讓client端可以實(shí)現(xiàn)指定方法的調(diào)用
internal.go
package internal
import (
"errors"
"fmt"
"reflect"
"runtime"
"strings"
)
// 全局唯一
var GlobalMethod = &Method{methods: map[string]reflect.Value{}}
type Method struct {
methods map[string]reflect.Value
}
func (m *Method) register(impl interface{}) error {
pl := reflect.ValueOf(impl)
if pl.Kind() != reflect.Func {
return errors.New("impl should be function")
}
// 獲取函數(shù)名
methodName := runtime.FuncForPC(pl.Pointer()).Name()
if len(strings.Split(methodName, ".")) < 1 {
return errors.New("invalid function name")
}
lastFuncName := strings.Split(methodName, ".")[1]
m.methods[lastFuncName] = pl
fmt.Printf("methods: %v\n", m.methods)
return nil
}
func (m *Method) call(methodName string, callParams ...interface{}) ([]interface{}, error) {
fn, ok := m.methods[methodName]
if !ok {
return nil, errors.New("impl method not found! Please Register first")
}
in := make([]reflect.Value, len(callParams))
for i := 0; i < len(callParams); i++ {
in[i] = reflect.ValueOf(callParams[i])
}
res := fn.Call(in)
out := make([]interface{}, len(res))
for i := 0; i < len(res); i++ {
out[i] = res[i].Interface()
}
return out, nil
}
func Call(methodName string, callParams ...interface{}) ([]interface{}, error) {
return GlobalMethod.call(methodName, callParams...)
}
func Register(impl interface{}) error {
return GlobalMethod.register(impl)
}
在單測里測試一下這個(gè)注冊和調(diào)用的功能internal_test.go
package internal
import (
"testing"
)
func Sum(a, b int) int {
return a + b
}
func TestRegister(t *testing.T) {
err := Register(Sum)
if err != nil {
t.Fatalf("err: %v\n", err)
}
t.Logf("test success\n")
}
func TestCall(t *testing.T) {
TestRegister(t)
result, err := Call("Sum", 1, 2)
if err != nil {
t.Fatalf("err: %v\n", err)
}
if len(result) != 1 {
t.Fatalf("len(result) is not equal to 1\n")
}
t.Logf("Sum(1,2) = %d\n", result[0].(int))
if err := recover(); err != nil {
t.Fatalf("%v\n", err)
}
}執(zhí)行調(diào)用
/usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v
Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestCall$ rpc_demo/internal -v
=== RUN TestCall
methods: map[Sum:<func(int, int) int Value>]
/root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:15: test success
/root/go/src/juejin_demo/rpc_demo/internal/internal_test.go:27: Sum(1,2) = 3
--- PASS: TestCall (0.00s)
PASS
ok rpc_demo/internal 0.002s
可以看到這個(gè)注冊和調(diào)用的過程已經(jīng)實(shí)現(xiàn)并且達(dá)到指定方法調(diào)用的作用
設(shè)計(jì)struct完整表達(dá)一次完整的rpc調(diào)用,并且封裝json庫中的Decoder和Encoder,完成序列化和反序列化
internal.go
type RpcRequest struct {
MethodName string
Params []interface{}
}
type RpcResponses struct {
Returns []interface{}
Err error
}
transport.go考慮可以對接更多的格式,所以抽象了一層進(jìn)行使用(demo肯定沒有更多格式了)
package transport
// Transport: 序列化格式的抽象層,從connection中讀取數(shù)據(jù)序列化并且反序列化到connection中
type Transport interface {
Decode(v interface{}) error
Encode(v interface{}) error
Close()
}
json_transport.go
package transport
import (
"encoding/json"
"net"
)
var _ Transport = (*JSONTransport)(nil)
type JSONTransport struct {
encoder *json.Encoder
decoder *json.Decoder
}
// NewJSONTransport: 負(fù)責(zé)讀取和寫入conn
func NewJSONTransport(conn net.Conn) *JSONTransport {
return &JSONTransport{json.NewEncoder(conn), json.NewDecoder(conn)}
}
// Decode: use json package to decode
func (t *JSONTransport) Decode(v interface{}) error {
if err := t.decoder.Decode(v); err != nil {
return err
}
return nil
}
// Encode: use json package to encode
func (t *JSONTransport) Encode(v interface{}) error {
if err := t.encoder.Encode(v); err != nil {
return err
}
return nil
}
// Close: not implement
func (dec *JSONTransport) Close() {
}
然后我們將服務(wù)端和客戶端的邏輯進(jìn)行修改,改成通過上面兩個(gè)結(jié)構(gòu)體進(jìn)行通信,然后返回一次調(diào)用 server.go
//...
for {
conn, err := s.ls.Accept()
if err != nil {
panic(err)
}
tsp := transport.NewJSONTransport(conn)
go func() {
for {
request := &internal.RpcRequest{}
err := tsp.Decode(request)
if err != nil {
panic(err)
}
log.Printf("[server] get request: %v\n", request)
result, err := internal.Call(request.MethodName, request.Params...)
log.Printf("[server] invoke method: %v\n", result)
if err != nil {
response := &internal.RpcResponses{Returns: nil, Err: err}
tsp.Encode(response)
continue
}
response := &internal.RpcResponses{Returns: result, Err: err}
if err := tsp.Encode(response); err != nil {
log.Printf("[server] encode response err: %v\n", err)
continue
}
}
}()
}
//...
client.go
// ...
// Call: remote invoke
func (c *Client) Call(methodName string, params ...interface{}) (res *internal.RpcResponses) {
request := internal.RpcRequest{MethodName: methodName, Params: params}
log.Printf("[client] create request to invoke server: %v\n", request)
err := c.tsp.Encode(request)
if err != nil {
panic(err)
}
res = &internal.RpcResponses{}
if err := c.tsp.Decode(res); err != nil {
panic(err)
}
log.Printf("[client] get response from server: %v\n", res)
return res
}
// ...main.go
package main
import (
"log"
"rpc_demo/internal"
"rpc_demo/internal/client"
"rpc_demo/internal/server"
"strings"
"time"
)
// Rpc方法的一個(gè)簡易實(shí)現(xiàn)
func Join(a ...string) string {
res := &strings.Builder{}
for i := 0; i < len(a); i++ {
res.WriteString(a[i])
}
return res.String()
}
func main() {
internal.Register(Join)
s := server.NewServer(9999)
s.Start()
time.Sleep(5 * time.Second)
c := client.NewClient(9999)
res := c.Call("Join", "aaaaa", "bbbbb", "ccccccccc", "end")
if res.Err != nil {
log.Printf("[main] get an error from server: %v\n", res.Err)
return
}
log.Printf("[main] get a response from server: %v\n", res.Returns[0].(string))
time.Sleep(5 * time.Second)
}
接下來我們運(yùn)行一下main
[root@hecs-74066 rpc_demo]# go run main.go
2023/03/05 14:39:11 server [127.0.0.1:9999] start....
2023/03/05 14:39:16 [conn: 127.0.0.1:59126] get data: this is a test
[root@hecs-74066 rpc_demo]# go run main.go
2023/03/05 21:53:41 server [127.0.0.1:9999] start....
2023/03/05 21:53:46 [client] create request to invoke server: {Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] get request: &{Join [aaaaa bbbbb ccccccccc end]}
2023/03/05 21:53:46 [server] invoke method: [aaaaabbbbbcccccccccend]
2023/03/05 21:53:46 [client] get response from server: &{[aaaaabbbbbcccccccccend] <nil>}
2023/03/05 21:53:46 [main] get a response from server: aaaaabbbbbcccccccccend
總結(jié)(自我pua)
這樣我們就實(shí)現(xiàn)了一個(gè)簡單的rpc框架了,符合最簡單的架構(gòu)圖,從client->序列化請求->transport -> 反序列化 ->server然后從server->序列化請求->transport->反序列化請求->client。當(dāng)然從可用性的角度來說是差遠(yuǎn)了,沒有實(shí)現(xiàn)stub代碼,也沒有idl的實(shí)現(xiàn),導(dǎo)致所有的注冊方法都是硬編碼,可用性不高,而且沒有集成服務(wù)發(fā)現(xiàn)(可以參考我的另一篇文章去集成)和熔斷等功能,也沒用中間件(也是我的另一篇文章)和超時(shí)等豐富的功能在里面,并且最近看了不少rpc框架的源碼,感覺這個(gè)demo的設(shè)計(jì)也差遠(yuǎn)了。不過因?yàn)闀r(shí)間問題和代碼的復(fù)雜性問題(單純懶),起碼算是實(shí)現(xiàn)了一個(gè)簡單的rpc框架。
推薦一些比較好的框架實(shí)現(xiàn)
- kitex
- dubbo
- grpc
- thrift
到此這篇關(guān)于Golang實(shí)現(xiàn)簡易的rpc調(diào)用的文章就介紹到這了,更多相關(guān)Golang rpc調(diào)用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang中int類型和字符串類型相互轉(zhuǎn)換的實(shí)現(xiàn)方法
在日常開發(fā)中,經(jīng)常需要將數(shù)字轉(zhuǎn)換為字符串或者將字符串轉(zhuǎn)換為數(shù)字,在 Golang 中,有一些很簡便的方法可以實(shí)現(xiàn)這個(gè)功能,接下來就詳細(xì)講解一下如何實(shí)現(xiàn) int 類型和字符串類型之間的互相轉(zhuǎn)換,需要的朋友可以參考下2023-09-09
GO語言標(biāo)準(zhǔn)錯(cuò)誤處理機(jī)制error用法實(shí)例
這篇文章主要介紹了GO語言標(biāo)準(zhǔn)錯(cuò)誤處理機(jī)制error用法,實(shí)例分析了錯(cuò)誤處理機(jī)制的具體用法,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2014-12-12
Golang使用Channel組建高并發(fā)HTTP服務(wù)器
Golang 作為一門高效的語言,在網(wǎng)絡(luò)編程方面表現(xiàn)也非常出色,這篇文章主要介紹了如何使用 Golang 和 Channel 組建高并發(fā) HTTP 服務(wù)器,感興趣的可以了解一下2023-06-06
golang有用的庫及工具 之 zap.Logger包的使用指南
這篇文章主要介紹了golang有用的庫及工具 之 zap.Logger包的使用指南,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
golang 生成對應(yīng)的數(shù)據(jù)表struct定義操作
這篇文章主要介紹了golang 生成對應(yīng)的數(shù)據(jù)表struct定義操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-04-04

