Python用內(nèi)置模塊來構(gòu)建REST服務(wù)與RPC服務(wù)實(shí)戰(zhàn)
寫在前面
和小伙伴們分享一些Python 網(wǎng)絡(luò)編程的一些筆記,博文為《Python Cookbook》讀書后筆記整理
博文涉及內(nèi)容包括:
- TCP/UDP服務(wù)構(gòu)建
- 不使用框架創(chuàng)建一個(gè)
REST風(fēng)格的web服務(wù) - 基于
XML-RPC實(shí)現(xiàn)簡(jiǎn)單的RPC - 基于
multiprocessing.connection實(shí)現(xiàn)簡(jiǎn)單的RPC - python實(shí)現(xiàn)作為客戶端與HTTP服務(wù)交互
在Python中,構(gòu)建一個(gè)靜態(tài)Web服務(wù)器,只需要 python3 -m http.server 端口號(hào)( 端口號(hào)不指定默認(rèn)是8000) 這一條命令就可以搞定了,之前也有看到有公司內(nèi)網(wǎng)中,一些安裝包放到服務(wù)器上每次FTP麻煩,用http模塊的方式很方便。
python在網(wǎng)絡(luò)方面封裝一些內(nèi)置模塊,可以用很簡(jiǎn)潔的代碼實(shí)現(xiàn)端到端的通信,比如HTTP、RPC服務(wù)等。
在編寫RPC和REST服務(wù)之前,先來溫習(xí)一下常見的的基于Socket模塊的一些端到端的通信協(xié)議。不管是RPC還是REST都需要底層的通信協(xié)議來支持。
對(duì)于TCP和UPD協(xié)議,在常見的網(wǎng)絡(luò)通信中,瀏覽器,郵件等一般應(yīng)用程序在收發(fā)數(shù)據(jù)時(shí)都是通過TCP協(xié)議的,DNS等收發(fā)較短的控制數(shù)據(jù)時(shí)一般會(huì)使用UDP。
創(chuàng)建TCP服務(wù)
實(shí)現(xiàn)一個(gè)服務(wù)器,通過 TCP 協(xié)議和客戶端通信。
創(chuàng)建一個(gè) TCP 服務(wù)器的一個(gè)簡(jiǎn)單方法是使用socketserver庫。一起來溫習(xí)下面這個(gè)簡(jiǎn)單的TCP服務(wù)器
from socketserver import BaseRequestHandler, TCPServer
class EchoHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
while True:
#接收客戶端發(fā)送的數(shù)據(jù), 這次接收數(shù)據(jù)的最大字節(jié)數(shù)是8192
msg = self.request.recv(8192)
# 接收的到數(shù)據(jù)在發(fā)送回去
if not msg:
break
self.request.send(msg)
if __name__ == '__main__':
# 20000端口,默認(rèn)IP為本地IP,監(jiān)聽到消息交個(gè)EchoHandler處理器
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()代碼很簡(jiǎn)單,指定IP暴露對(duì)應(yīng)的端口,這里通過serv.serve_forever()來保證連接一直存在。
Got connection from ('127.0.0.1', 1675)
建立好服務(wù)端之后看下客戶端:
- AF_INET:表示ipv4
- SOCK_STREAM: tcp傳輸協(xié)議
>>> from socket import socket, AF_INET, SOCK_STREAM
>>> s = socket(AF_INET, SOCK_STREAM)
>>> s.connect(('localhost', 20000))
>>> s.send(b'Hello')
5
>>> s.recv(8192)
b'Hello'
>>>socketserver 默認(rèn)情況下這種服務(wù)器是單線程的,一次只能為一個(gè)客戶端連接服務(wù)。如果想通過多線程處理多個(gè)客戶端,可以初始化一個(gè)ForkingTCPServer 或者是ThreadingTCPServer對(duì)象。
from socketserver import ThreadingTCPServer
if __name__ == '__main__':
serv = ThreadingTCPServer(('', 20000), EchoHandler)
serv.serve_forever()使用 fork 或線程服務(wù)器有個(gè)潛在問題就是它們會(huì)為每個(gè)客戶端連接創(chuàng)建一個(gè)新的進(jìn)程或線程。由于客戶端連接數(shù)是沒有限制的,因此一個(gè)惡意的黑客可以同時(shí)發(fā)送大量的連接讓的服務(wù)器奔潰。
可以創(chuàng)建一個(gè)預(yù)先分配大小的 工作線程池或進(jìn)程池,先創(chuàng)建一個(gè)普通的非線程服務(wù)器,然后在一個(gè)線程池中使用serve forever()方法來啟動(dòng)它們。
if __name__ == '__main__':
from threading import Thread
NWORKERS = 16
serv = TCPServer(('', 20000), EchoHandler)
for n in range(NWORKERS):
t = Thread(target=serv.serve_forever)
t.daemon = True
t.start()
serv.serve_forever()一般來講,一個(gè)TCPServer在實(shí)例化的時(shí)候會(huì)綁定并激活相應(yīng)的socket,如果 bind_and_activate 為真,則構(gòu)造方法會(huì)自動(dòng)調(diào)用server_bind() 和 server_activate()方法。其余參數(shù)傳給基類 BaseServer,有時(shí)候想通過設(shè)置某些選項(xiàng)去調(diào)整底下的socket,可以設(shè)置參數(shù)bind_and_activate=False。
if __name__ == '__main__':
serv = TCPServer(('', 20000), EchoHandler, bind_and_activate=False)
# Set up various socket options
serv.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
# Bind and activate
serv.server_bind()
serv.server_activate()
serv.serve_forever()socket.SO_REUSEADDR允許服務(wù)器重新綁定一個(gè)之前使用過的端口號(hào)。由于要被經(jīng)常使用到,它被放置到類變量中,可以直接在 TCPServer上面設(shè)置
當(dāng)然,也可以不使用一個(gè)工具類,直接使用原生的Socket的編寫TCP服務(wù)端
from socket import socket, AF_INET, SOCK_STREAM
def echo_handler(address, client_sock):
print('Got connection from {}'.format(address))
while True:
msg = client_sock.recv(8192)
if not msg:
break
client_sock.sendall(msg)
client_sock.close()
def echo_server(address, backlog=5):
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(address)
sock.listen(backlog)
while True:
client_sock, client_addr = sock.accept()
echo_handler(client_addr, client_sock)
if __name__ == '__main__':
echo_server(('', 20000))了解了TCP的服務(wù)通信,在來看一下UDP的
創(chuàng)建UDP服務(wù)
實(shí)現(xiàn)一個(gè)基于 UDP 協(xié)議的服務(wù)器來與客戶端通信。
跟 TCP 一樣,UDP 服務(wù)器也可以通過使用socketserver庫很容易的被創(chuàng)建。
例如,下面是一個(gè)簡(jiǎn)單的時(shí)間服務(wù)器:
from socketserver import BaseRequestHandler, UDPServer
import time
class TimeHandler(BaseRequestHandler):
def handle(self):
print('Got connection from', self.client_address)
# Get message and client socket request 屬性是一個(gè)包含了數(shù)據(jù)報(bào)和底層 socket 對(duì)象的元組
msg, sock = self.request
resp = time.ctime()
sock.sendto(resp.encode('ascii'), self.client_address)
if __name__ == '__main__':
serv = UDPServer(('', 20000), TimeHandler)
serv.serve_forever()測(cè)試一下:
>>> from socket import socket, AF_INET, SOCK_DGRAM
>>> s = socket(AF_INET, SOCK_DGRAM)
>>> s.sendto(b'', ('localhost', 20000))
0
>>> s.recvfrom(8192)
(b'Tue May 3 11:48:53 2022', ('127.0.0.1', 20000))
>>>對(duì)于UPD協(xié)議而言,對(duì)于數(shù)據(jù)報(bào)的傳送,應(yīng)該使用 socket 的sendto() 和 recvfrom() 方法,因?yàn)槭敲嫦驘o連接的,沒有建立連接的步驟,但是要在發(fā)生時(shí)跟著接受方
了解基本的通信協(xié)議之后,回到今天要講的ERST接口。REST接口是基于HTTP協(xié)議的,而HTTP是直接依賴TCP的協(xié)議棧,負(fù)責(zé)約束表示層
創(chuàng)建一個(gè)簡(jiǎn)單的REST接口
使用一個(gè)簡(jiǎn)單的 REST 接口通過網(wǎng)絡(luò)遠(yuǎn)程控制或訪問的應(yīng)用程序,但是又不想自己去安裝一個(gè)完整的 web 框架。

可以構(gòu)建一個(gè) REST 風(fēng)格的接口,最簡(jiǎn)單的方法是創(chuàng)建一個(gè)基于 WSGI 標(biāo)準(zhǔn)(Web服務(wù)網(wǎng)關(guān)接口,PEP 3333)的很小的庫。類似支持REST風(fēng)格的Python Web框架 Flask。
"""
@File : app.py
@Author : Li Ruilong
@Version : 1.0
@Desc : None
"""
# here put the import lib
import time
import cgi
def notfound_404(environ, start_response):
start_response('404 Not Found', [('Content-type', 'text/plain')])
return [b'Not Found']
# 核心控制器,用于路由注冊(cè)
class PathDispatcher:
def __init__(self):
# 映射字典
self.pathmap = {}
# 核心控制器的回調(diào)
def __call__(self, environ, start_response):
# 獲取路由
path = environ['PATH_INFO']
# 獲取請(qǐng)求參數(shù)
params = cgi.FieldStorage(environ['wsgi.input'],
environ=environ)
# 獲取請(qǐng)求方法
method = environ['REQUEST_METHOD'].lower()
environ['params'] = {key: params.getvalue(key) for key in params}
# 找到映射的函數(shù)
handler = self.pathmap.get((method, path), notfound_404)
# 返回函數(shù)
return handler(environ, start_response)
def register(self, method, path, function):
# 請(qǐng)求方法和路由作為K,執(zhí)行函數(shù)為V
self.pathmap[method.lower(), path] = function
return function
_hello_resp = "wo jiao {name}"
def hello_world(environ, start_response):
start_response('200 OK', [('Content-type', 'text/html')])
params = environ['params']
resp = _hello_resp.format(name=params.get('name'))
yield resp.encode('utf-8')
_localtime_resp = "dang qian shjian {t}"
# 路由的回調(diào)
def localtime(environ, start_response):
start_response('200 OK', [('Content-type', 'application/xml')])
resp = _localtime_resp.format(t=time.localtime())
yield resp.encode('utf-8')
if __name__ == '__main__':
from wsgiref.simple_server import make_server
# 創(chuàng)建一個(gè)核心控制器,用于路由注冊(cè)
dispatcher = PathDispatcher()
# 注冊(cè)路由,對(duì)應(yīng)的回調(diào)方法
dispatcher.register('GET', '/hello', hello_world)
dispatcher.register('GET', '/localtime', localtime)
# Launch a basic server 監(jiān)聽8080端口,注入核心控制器
httpd = make_server('', 8080, dispatcher)
print('Serving on port 8080...')
httpd.serve_forever()測(cè)試一下:
┌──[root@liruilongs.github.io]-[~] └─$coproc (./app.py) [2] 130447
curl localhost:8080/hello
┌──[root@liruilongs.github.io]-[~] └─$curl localhost:8080/hello 127.0.0.1 - - [03/May/2022 16:09:12] "GET /hello HTTP/1.1" 200 12 wo jiao None
curl localhost:8080/hello?name=liruilong
┌──[root@liruilongs.github.io]-[~] └─$curl localhost:8080/hello?name=liruilong 127.0.0.1 - - [03/May/2022 16:09:47] "GET /hello?name=liruilong HTTP/1.1" 200 17 wo jiao liruilong ┌──[root@liruilongs.github.io]-[~] └─$jobs .... [2]- 運(yùn)行中 coproc COPROC ( ./app.py ) &
實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 REST 接口,只需讓的程序代碼滿足 Python 的 WSGI標(biāo)準(zhǔn)即可。WSGI 被標(biāo)準(zhǔn)庫支持,同時(shí)也被絕大部分第三方 web 框架支持。
這里感覺Python Web的WSGI標(biāo)準(zhǔn)和Java Web 體系的Servlet規(guī)范特別接近,但是Servlet是侵入式的,同時(shí)需要特定的Web容器(Tomcat)支持,而WSGI好像對(duì)代碼的影響很少…感興趣小伙伴可以研究下.
另一方面,通過上面的代碼,可以對(duì)當(dāng)下這種Web端MVC的設(shè)計(jì)模式流程(Flask,Django,SpringMVC)有一個(gè)基本的認(rèn)識(shí),當(dāng)然實(shí)際的框架要復(fù)雜的多。但是基本構(gòu)建思路一樣。
WSGI標(biāo)準(zhǔn)簡(jiǎn)單分析
以一個(gè)可調(diào)用對(duì)象形式來實(shí)現(xiàn)路由匹配要操作的方法
import cgi
def wsgi_app(environ, start_response):
passenviron 屬性是一個(gè)字典,包含了從 web 服務(wù)器如 Apache[參考 Internet RFC 3875]提供的 CGI 接口中獲取的值。要將這些不同的值提取出來,可以像這么這樣寫:
def wsgi_app(environ, start_response):
method = environ['REQUEST_METHOD']
path = environ['PATH_INFO']
# Parse the query parameters
params = cgi.FieldStorage(environ['wsgi.input'], environ=environ)start_response 參數(shù)是一個(gè)為了初始化一個(gè)請(qǐng)求對(duì)象而必須被調(diào)用的函數(shù)。第一個(gè)參數(shù)是返回的 HTTP 狀態(tài)值,第二個(gè)參數(shù)是一個(gè) (名, 值) 元組列表,用來構(gòu)建返回的 HTTP 頭。
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])為了返回?cái)?shù)據(jù),一個(gè) WSGI 程序必須返回一個(gè)字節(jié)字符串序列??梢韵裣旅孢@樣使用一個(gè)列表來完成
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
resp = []
resp.append(b'Hello World\n')
resp.append(b'Goodbye!\n')
return resp或者,還可以使用 yield
def wsgi_app(environ, start_response):
pass
start_response('200 OK', [('Content-type', 'text/plain')])
yield b'Hello World\n'
yield b'Goodbye!\n'最后返回的必須是字節(jié)字符串。如果返回結(jié)果包含文本字符串,必須先將其編碼成字節(jié)。圖片也是OK的
class WSGIApplication:
def __init__(self):
...
def __call__(self, environ, start_response)
...PathDispatcher 類。這個(gè)分發(fā)器僅僅只是管理一個(gè)字典,將 (方法, 路徑) 對(duì)映射到處理器函數(shù)上面。當(dāng)一個(gè)請(qǐng)求到來時(shí),它的方法和路徑被提取出來,然后被分發(fā)到對(duì)應(yīng)的處理器上面去。
dispatcher = PathDispatcher()
# 注冊(cè)路由,對(duì)應(yīng)的回調(diào)
dispatcher.register('GET', '/hello', hello_world)
dispatcher.register('GET', '/localtime', localtime)任何查詢變量會(huì)被解析后放到一個(gè)字典中,以 environ['params'] 形式存儲(chǔ)。后面這個(gè)步驟太常見,所以建議在分發(fā)器里面完成,這樣可以省掉很多重復(fù)代碼。使用分發(fā)器的時(shí)候,只需簡(jiǎn)單的創(chuàng)建一個(gè)實(shí)例,然后通過它注冊(cè)各種 WSGI 形式的函數(shù)。編寫這些函數(shù)應(yīng)該超級(jí)簡(jiǎn)單了,只要遵循 start_response() 函數(shù)的編寫規(guī)則,并且最后返回字節(jié)字符串即可。
WSGI 本身是一個(gè)很小的標(biāo)準(zhǔn)。因此它并沒有提供一些高級(jí)的特性比如認(rèn)證、cookies、重定向、全局的異常處理等。這些自己實(shí)現(xiàn)起來也不難。不過如果想要更多的支持,可以考慮第三方庫
上面服務(wù)端的構(gòu)建,我們使用了curl工具來訪問,那么作為客戶端Python有哪些交互方式?
作為客戶端與HTTP服務(wù)交互
需要通過 HTTP 協(xié)議以客戶端的方式訪問多種服務(wù)。例如,下載數(shù)據(jù)或者與基于 REST 的 API 進(jìn)行交互。
對(duì)于簡(jiǎn)單的事情來說,通常使用urllib.request模塊就夠了.一個(gè)Get請(qǐng)求的Demo
┌──[root@liruilongs.github.io]-[~]
└─$python3
Python 3.6.8 (default, Nov 16 2020, 16:55:22)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from urllib import request, parse
>>> url = 'http://httpbin.org/get'
>>> parms = {
... 'name1': 'value1',
... 'name2': 'value2'
... }
>>> querystring = parse.urlencode(parms)
>>> querystring
'name1=value1&name2=value2'
>>> request.urlopen(url+'?' + querystring)
<http.client.HTTPResponse object at 0x7ffa0ef0f710>
>>> u = request.urlopen(url+'?' + querystring)
>>> u.read()
b'{
"args": {
"name1": "value1",
"name2": "value2"
},
"headers": {
"Accept-Encoding": "identity",
"Host": "httpbin.org",
"User-Agent": "Python-urllib/3.6",
"X-Amzn-Trace-Id": "Root=1-62707b15-41a1169c0897c9001a07f948"
},
"origin": "39.154.13.139",
"url": "http://httpbin.org/get?name1=value1&name2=value2"
}'如果需要使用POST方法在請(qǐng)求主體中發(fā)送查詢參數(shù),可以將參數(shù)編碼后作為可選參數(shù)提供給urlopen()函數(shù),就像這樣:
>>> from urllib import request, parse
>>> url = 'http://httpbin.org/post'
>>> parms = {
... 'name1' : 'value1',
... 'name2' : 'value2'
... }
>>> querystring = parse.urlencode(parms)
>>> querystring.encode('ascii')
b'name1=value1&name2=value2'
>>> u = request.urlopen(url, querystring.encode('ascii'))
>>> resp = u.read()
>>> resp
b'{
"args": {},
"data": "",
"files": {},
"form": {
"name1": "value1",
"name2": "value2"
},
"headers": {
"Accept-Encoding": "identity",
"Content-Length": "25",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "httpbin.org",
"User-Agent": "Python-urllib/3.6",
"X-Amzn-Trace-Id": "Root=1-62707d24-15e9944760d3bbaa36c3714a"
},
"json": null,
"origin": "39.154.13.139",
"url": "http://httpbin.org/post"
}'
>>>在發(fā)出的請(qǐng)求中提供一些自定義的 HTTP 請(qǐng)求首部,創(chuàng)建一個(gè) Request 實(shí)例然后將其傳給urlopen()
>>> from urllib import request, parse
>>> headers = {
... 'User-agent' : 'none/ofyourbusiness',
... 'Spam' : 'Eggs'
... }
>>> req = request.Request(url, querystring.encode('ascii'), headers=headers)
>>> u = request.urlopen(req)
>>> u.read()
b'{
"args": {},
"data": "",
"files": {},
"form": {
"name1": "value1",
"name2": "value2"
},
"headers": {
"Accept-Encoding": "identity",
"Content-Length": "25",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "httpbin.org",
"Spam": "Eggs",
"User-Agent": "none/ofyourbusiness",
"X-Amzn-Trace-Id": "Root=1-62707f0e-308a8137555e15797d950018"
},
"json": null,
"origin": "39.154.13.139",
"url": "http://httpbin.org/post"
}'
>>>如果需要交互的服務(wù),可以使用 requests 模塊, 這個(gè)不是自帶模塊,需要安裝python3 -m pip install requests
>>> import requests
>>> url = 'http://httpbin.org/post'
>>> parms = {
... 'name1' : 'value1',
... 'name2' : 'value2'
... }
>>> headers = {
... 'User-agent' : 'none/ofyourbusiness',
... 'Spam' : 'Eggs'
... }
>>> resp = requests.post(url, data=parms, headers=headers)
>>> resp.text
'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "name1": "value1", \n "name2": "value2"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "25", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "httpbin.org", \n "Spam": "Eggs", \n "User-Agent": "none/ofyourbusiness", \n "X-Amzn-Trace-Id": "Root=1-62708080-7a14319e699baa2e35a352fb"\n }, \n "json": null, \n "origin": "39.154.13.139", \n "url": "http://httpbin.org/post"\n}\n'
>>> resp.json()
{'args': {}, 'data': '', 'files': {}, 'form': {'name1': 'value1', 'name2': 'value2'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '25', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'Spam': 'Eggs', 'User-Agent': 'none/ofyourbusiness', 'X-Amzn-Trace-Id': 'Root=1-62708080-7a14319e699baa2e35a352fb'}, 'json': None, 'origin': '39.154.13.139', 'url': 'http://httpbin.org/post'}
>>>requests 模塊支持很多種數(shù)據(jù)的返會(huì)方式,可以直接返回以 Unicode 解碼的響應(yīng)文本,也可以返回JSON數(shù)據(jù)
利用 requests 庫發(fā)起一個(gè) HEAD 請(qǐng)求
>>> import requests >>> resp = requests.head( 'http://httpbin.org/post') >>> resp <Response [405]> >>> resp = requests.head( 'http://httpbin.org/') >>> resp <Response [200]> >>> resp.status_code 200 >>> resp.headers['content-length'] '9593' >>> resp.headers['content-type'] 'text/html; charset=utf-8' >>> resp.text '' >>>
如果決定堅(jiān)持使用標(biāo)準(zhǔn)的程序庫而不考慮像requests這樣的第三方庫,可以使用底層的 http.client 模塊來實(shí)現(xiàn)自己的代碼。
from http.client import HTTPConnection
from urllib import parse
c = HTTPConnection('www.python.org', 80)
c.request('HEAD', '/index.html')
resp = c.getresponse()
print('Status', resp.status)
for name, value in resp.getheaders():
print(name, value)測(cè)試 HTTP 客戶端,考慮使用httpbin服務(wù)(http://httpbin.org)。這個(gè)站點(diǎn)會(huì)接收發(fā)出的請(qǐng)求,然后以JSON 的形式將相應(yīng)信息回傳回來。
>>> import requests
>>> r = requests.get('http://httpbin.org/get?name=Dave&n=37',
... headers = { 'User-agent': 'goaway/1.0' })
>>> resp = r.json()
>>> resp['headers']
{'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'httpbin.org', 'User-Agent': 'goaway/1.0', 'X-Amzn-Trace-Id': 'Root=1-62708c06-7c7d8cc4441479c65faea5b4'}
>>>通過XML-RPC實(shí)現(xiàn)簡(jiǎn)單的遠(yuǎn)程調(diào)用
RPC,通俗的講,想找到一個(gè)方式去運(yùn)行在遠(yuǎn)程機(jī)器上面的 Python 程序中的函數(shù)或方法。
實(shí)現(xiàn)一個(gè)遠(yuǎn)程方法調(diào)用的最簡(jiǎn)單方式是使用 XML-RPC。下面實(shí)現(xiàn)了鍵 值存儲(chǔ)功能的簡(jiǎn)單RPC服務(wù)器:
"""
@File : app.py
@Author : Li Ruilong
@Version : 1.0
@Desc : None
"""
# here put the import lib
from xmlrpc.server import SimpleXMLRPCServer
class KeyValueServer:
_rpc_methods_ = ['get', 'set', 'delete', 'exists', 'keys']
def __init__(self, address):
self._data = {}
self._serv = SimpleXMLRPCServer(address, allow_none=True)
# 注冊(cè)方法
for name in self._rpc_methods_:
self._serv.register_function(getattr(self, name))
def get(self, name):
return self._data[name]
def set(self, name, value):
self._data[name] = value
def delete(self, name):
del self._data[name]
def exists(self, name):
return name in self._data
def keys(self):
return list(self._data)
def serve_forever(self):
self._serv.serve_forever()
# Example
if __name__ == '__main__':
kvserv = KeyValueServer(('', 15001))
kvserv.serve_forever()RPC客戶端測(cè)試
PS E:\docker> python
Python 3.9.0 (tags/v3.9.0:9cf6752, Oct 5 2020, 15:23:07) [MSC v.1927 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> from xmlrpc.client import ServerProxy
>>> s = ServerProxy('http://localhost:15001', allow_none=True)
>>> s.set('foo','bar')
>>> s.set('spam', [1, 2, 3])
>>> s.keys()
['foo', 'spam']
>>> s.get('foo')
'bar'
>>> s.get('spam')
[1, 2, 3]
>>> s.delete('spam')
>>> s.exists('spam')
False
>>>XML-RPC 可以讓很容易的構(gòu)造一個(gè)簡(jiǎn)單的遠(yuǎn)程調(diào)用服務(wù)。所需要做的僅僅是創(chuàng)建一個(gè)服務(wù)器實(shí)例,通過它的方法register_function()來注冊(cè)函數(shù),然后使用方法serve_forever()啟動(dòng)它。在上面將這些步驟放在一起寫到一個(gè)類中
這并不是必須的。還可以像下面這樣創(chuàng)建一個(gè)服務(wù)器:
from xmlrpc.server import SimpleXMLRPCServer
from xmlrpc.server import SimpleXMLRPCServer
def add(x,y):
return x+y
serv = SimpleXMLRPCServer(('', 15000))
serv.register_function(add)
serv.serve_forever()XML-RPC 暴露出來的函數(shù)只能適用于部分?jǐn)?shù)據(jù)類型,比如字符串、整形、列表和字典,不應(yīng)該將 XML-RPC 服務(wù)以公共 API 的方式暴露出來。
XML-RPC 的一個(gè)缺點(diǎn)是它的性能。SimpleXMLRPCServer 的實(shí)現(xiàn)是單線程的,所以它不適合于大型程序
由于 XML-RPC 將所有數(shù)據(jù)都序列化為 XML 格式,所以它會(huì)比其他的方式運(yùn)行的慢一些。但是它也有優(yōu)點(diǎn),這種方式的編碼可以被絕大部分其他編程語言支持。通過使用這種方式,其他語言的客戶端程序都能訪問的服務(wù)。
通過 multiprocessing 實(shí)現(xiàn)RPC調(diào)用
在一個(gè)消息傳輸層如 sockets、multiprocessing.connections或zeroMQ的基礎(chǔ)之上實(shí)現(xiàn)一個(gè)簡(jiǎn)單的遠(yuǎn)程過程調(diào)用(RPC)
將函數(shù)請(qǐng)求、參數(shù)和返回值使用pickle編碼后,在不同的解釋器直接傳送pickle字節(jié)字符串,可以很容易的實(shí)現(xiàn)RPC。下面是一個(gè)簡(jiǎn)單的PRC處理器,可以被整合到一個(gè)服務(wù)器中去:
RPC 服務(wù)端
"""
@File : rpcserver.py
@Author : Li Ruilong
@Version : 1.0
@Desc : 遠(yuǎn)程調(diào)用服務(wù)
"""
# here put the import lib
import pickle
from multiprocessing.connection import Listener
from threading import Thread
"""
@Time : 2022/07/08 20:28:02
@Author : Li Ruilong
@Version : 1.0
@Desc : None
Args:
遠(yuǎn)程調(diào)用處理器
Returns:
void
"""
class RPCHandler:
def __init__(self):
self._functions = {}
"""
@Time : 2022/07/08 20:16:47
@Author : Li Ruilong
@Version : 1.0
@Desc : 函數(shù)注冊(cè)
Args:
func
Returns:
void
"""
def register_function(self, func):
self._functions[func.__name__] = func
"""
@Time : 2022/07/08 20:17:51
@Author : Li Ruilong
@Version : 1.0
@Desc : 調(diào)用函數(shù)
Args:
connection
Returns:
void
"""
def handle_connection(self, connection):
try:
while True:
func_name, args, kwargs = pickle.loads(connection.recv())
try:
print("調(diào)用函數(shù):",(func_name, args, kwargs))
r = self._functions[func_name](*args,**kwargs)
print("返回結(jié)果:",r)
connection.send(pickle.dumps(r))
except Exception as e:
connection.send(pickle.dumps(e))
except Exception as e:
pass
def rpc_server(handler, address, authkey):
sock = Listener(address, authkey=authkey)
while True:
client = sock.accept()
t = Thread(target=handler.handle_connection, args=(client,))
t.daemon = True
print("函數(shù)開始執(zhí)行")
t.start()
def add(x, y):
return x + y
def sub(x, y):
return x - y
if __name__ == '__main__':
print(format("開始加載RPC處理器",'》<20'))
handler = RPCHandler()
print(format("處理器加載完成,注冊(cè)函數(shù)",'》<20'))
handler.register_function(add)
handler.register_function(sub)
print(format("函數(shù)注冊(cè)成功,服務(wù)啟動(dòng)",'》<20'))
rpc_server(handler, ('localhost', 17000), authkey=b'peekaboo')RPC 客戶端
import pickle
from multiprocessing.connection import Client
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
print("開始調(diào)用函數(shù)",name)
def do_rpc(*args, **kwargs):
self._connection.send(pickle.dumps((name, args, kwargs)))
result = pickle.loads(self._connection.recv())
print("返回結(jié)果",result)
if isinstance(result, Exception):
raise result
return result
return do_rpc
c = Client(('localhost', 17000), authkey=b'peekaboo')
print(format("建立連接,創(chuàng)建RPC代理",'》<30'),c)
proxy = RPCProxy(c)
print(format("創(chuàng)建代理成功",'》<30'))
print("add(2, 3) = ",proxy.add(2, 3) )
print("sub(2, 3) = ", proxy.sub(2, 3))D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/rpcserver.py
開始加載RPC處理器》》》》》》》》》》
處理器加載完成,注冊(cè)函數(shù)》》》》》》》》
函數(shù)注冊(cè)成功,服務(wù)啟動(dòng)》》》》》》》》》
函數(shù)開始執(zhí)行
調(diào)用函數(shù): ('add', (2, 3), {})
返回結(jié)果: 5
調(diào)用函數(shù): ('sub', (2, 3), {})
返回結(jié)果: -1
==============
D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/RPC.py
建立連接,創(chuàng)建RPC代理》》》》》》》》》》》》》》》》》》 <multiprocessing.connection.Connection object at 0x00DFACA0>
創(chuàng)建代理成功》》》》》》》》》》》》》》》》》》》》》》》》
開始調(diào)用函數(shù) add
返回結(jié)果 5
add(2, 3) = 5
開始調(diào)用函數(shù) sub
返回結(jié)果 -1
sub(2, 3) = -1Process finished with exit code 0
RPCHandler和RPCProxy的基本思路是很比較簡(jiǎn)單的。
如果一個(gè)客戶端想要調(diào)用一個(gè)遠(yuǎn)程函數(shù),比如foo(1,2,z=3),代理類創(chuàng)建一個(gè)包含了函數(shù)名和參數(shù)的元組(foo’,(1,2),{‘z’:3})。這個(gè)元組被 pickle 序列化后通過網(wǎng)絡(luò)連接發(fā)生出去。
由于底層需要依賴 pickle,那么安全問題就需要考慮了(因?yàn)橐粋€(gè)聰明的黑客可以創(chuàng)建特定的消息,能夠讓任意函數(shù)通過 pickle反序列化后被執(zhí)行)。
因此永遠(yuǎn)不要允許來自不信任或未認(rèn)證的客戶端的RPC。特別是絕對(duì)不要允許來自Internet的任意機(jī)器的訪問,這種只能在內(nèi)部被使用,位于防火墻后面并且不要對(duì)外暴露。
作為pickle的替代,也許可以考慮使用JSON、XML或一些其他的編碼格式來序列化消息。
例如,本機(jī)實(shí)例可以很容易的改寫成JSON編碼方案。還需要將pickle.1oads()和pickle.dumps()替換成json.1oads()和json.dumps()即可:
# here put the import lib
import json
........
def handle_connection(self, connection):
try:
while True:
# 反序列化
func_name, args, kwargs = json.loads(connection.recv())
try:
print("調(diào)用函數(shù):",(func_name, args, kwargs))
r = self._functions[func_name](*args,**kwargs)
print("返回結(jié)果:",r)
# 序列化發(fā)送
connection.send(json.dumps(r))
except Exception as e:
connection.send(json.dumps(e))
except Exception as e:
pass
......import json
from multiprocessing.connection import Client
class RPCProxy:
def __init__(self, connection):
self._connection = connection
def __getattr__(self, name):
print("開始調(diào)用函數(shù)",name)
def do_rpc(*args, **kwargs):
print("JSON 序列化后的值",json.dumps((name, args, kwargs)))
self._connection.send(json.dumps((name, args, kwargs)))
result = json.loads(self._connection.recv())
print("返回結(jié)果",result)
if isinstance(result, Exception):
raise result
return result
return do_rpc
c = Client(('localhost', 17000), authkey=b'peekaboo')
print(format("建立連接,創(chuàng)建RPC代理",'》<30'),c)
proxy = RPCProxy(c)
print(format("創(chuàng)建代理成功",'》<30'))
print("add(2, 3) = ",proxy.add(2, 3) )
print("sub(2, 3) = ", proxy.sub(2, 3))可以看到序列化后的結(jié)果:
D:\python\Python310\python.exe D:/python/code/rabbit_mq_demo/RPC.py
建立連接,創(chuàng)建RPC代理》》》》》》》》》》》》》》》》》》 <multiprocessing.connection.Connection object at 0x0078AD30>
創(chuàng)建代理成功》》》》》》》》》》》》》》》》》》》》》》》》
開始調(diào)用函數(shù) add
JSON 序列化后的值 ["add", [2, 3], {}]
返回結(jié)果 5
add(2, 3) = 5
開始調(diào)用函數(shù) sub
JSON 序列化后的值 ["sub", [2, 3], {}]
返回結(jié)果 -1
sub(2, 3) = -1
實(shí)現(xiàn)RPC的一個(gè)比較復(fù)雜的問題是如何去處理異常。至少,當(dāng)方法產(chǎn)生異常時(shí)服務(wù)器不應(yīng)該奔潰。因此,返回給客戶端的異常所代表的含義就要好好設(shè)計(jì)了。
如果使用pickle,異常對(duì)象實(shí)例在客戶端能被反序列化并拋出。如果使用其他的協(xié)議,那得想想另外的方法了。不過至少,應(yīng)該在響應(yīng)中返回異常字符串。在JSON的例子中就是使用的這種方式。
到此這篇關(guān)于Python用內(nèi)置模塊來構(gòu)建REST服務(wù)與RPC服務(wù)實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)Python 構(gòu)建REST服務(wù) 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)現(xiàn)CNN的多通道輸入實(shí)例
今天小編就為大家分享一篇Python實(shí)現(xiàn)CNN的多通道輸入實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-01-01
python自動(dòng)打開瀏覽器下載zip并提取內(nèi)容寫入excel
這篇文章主要給大家介紹了關(guān)于python自動(dòng)打開瀏覽器下載zip并提取內(nèi)容寫入excel的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
Python命令行參數(shù)解析包argparse的使用詳解
argparse?是?python?自帶的命令行參數(shù)解析包,可以用來方便的服務(wù)命令行參數(shù)。本文將通過示例和大家詳細(xì)講講argparse的使用,需要的可以參考一下2022-09-09
淺析Python語言自帶的數(shù)據(jù)結(jié)構(gòu)有哪些
Python已經(jīng)廣泛的應(yīng)用于數(shù)據(jù)分析、數(shù)據(jù)挖掘、機(jī)器學(xué)習(xí)等眾多科學(xué)計(jì)算領(lǐng)域,這篇文章主要介紹了Python語言自帶的數(shù)據(jù)結(jié)構(gòu)有哪些?需要的朋友可以參考下2019-08-08
解決django后臺(tái)管理界面添加中文內(nèi)容亂碼問題
今天小編就為大家分享一篇解決django后臺(tái)管理界面添加中文內(nèi)容亂碼問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-11-11
Python編程實(shí)現(xiàn)生成特定范圍內(nèi)不重復(fù)多個(gè)隨機(jī)數(shù)的2種方法
這篇文章主要介紹了Python編程實(shí)現(xiàn)生成特定范圍內(nèi)不重復(fù)多個(gè)隨機(jī)數(shù)的2種方法,涉及Python基于random生成隨機(jī)數(shù)的常見操作技巧,需要的朋友可以參考下2017-04-04
如何解決flask修改靜態(tài)資源后緩存文件不能及時(shí)更改問題
在本篇內(nèi)容里小編給大家整理的是關(guān)于如何解決flask修改靜態(tài)資源后緩存文件不能及時(shí)更改問題,需要的朋友們可以學(xué)習(xí)下。2020-08-08
Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例
今天小編就為大家分享一篇Python根據(jù)歐拉角求旋轉(zhuǎn)矩陣的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2019-01-01

