使用Python編寫自己的微型Redis
building-a-simple-redis-server-with-python
前幾天我想到,寫一個(gè)簡(jiǎn)單的東西會(huì)很整潔 雷迪斯-像數(shù)據(jù)庫(kù)服務(wù)器。雖然我有很多 WSGI應(yīng)用程序的經(jīng)驗(yàn),數(shù)據(jù)庫(kù)服務(wù)器展示了一種新穎 挑戰(zhàn),并被證明是學(xué)習(xí)如何工作的不錯(cuò)的實(shí)際方法 Python中的套接字。在這篇文章中,我將分享我在此過(guò)程中學(xué)到的知識(shí)。
我項(xiàng)目的目的是 編寫一個(gè)簡(jiǎn)單的服務(wù)器 我可以用 我的任務(wù)隊(duì)列項(xiàng)目稱為 Huey。 Huey使用Redis作為默認(rèn)存儲(chǔ)引擎來(lái)跟蹤被引用的工作, 完成的工作和其他結(jié)果。就本職位而言, 我進(jìn)一步縮小了原始項(xiàng)目的范圍,以免造成混亂 使用代碼的水域,您可以很容易地自己寫,但是如果您 很好奇,你可以看看 最終結(jié)果 這里 (文件)。
我們將要構(gòu)建的服務(wù)器將能夠響應(yīng)以下命令:
- GET
<key>
- SET
<key>
<value>
- DELETE
<key>
- FLUSH
- MGET
<key1>
...<keyn>
- MSET
<key1>
<value1>
...<keyn>
<valuen>
我們還將支持以下數(shù)據(jù)類型:
- Strings and Binary Data
- Numbers
- NULL
- Arrays (which may be nested)
- Dictionaries (which may be nested)
- Error messages
為了異步處理多個(gè)客戶端,我們將使用 gevent, 但是您也可以使用標(biāo)準(zhǔn)庫(kù)的 SocketServer 模塊與 要么 ForkingMixin 或 ThreadingMixin。
骨架
讓我們?yōu)榉?wù)器設(shè)置一個(gè)框架。我們需要服務(wù)器本身,以及 新客戶端連接時(shí)要執(zhí)行的回調(diào)。另外,我們需要 某種邏輯來(lái)處理客戶端請(qǐng)求并發(fā)送響應(yīng)。
這是一個(gè)開(kāi)始:
from gevent import socket from gevent.pool import Pool from gevent.server import StreamServer from collections import namedtuple from io import BytesIO from socket import error as socket_error # We'll use exceptions to notify the connection-handling loop of problems. class CommandError(Exception): pass class Disconnect(Exception): pass Error = namedtuple('Error', ('message',)) class ProtocolHandler(object): def handle_request(self, socket_file): # Parse a request from the client into it's component parts. pass def write_response(self, socket_file, data): # Serialize the response data and send it to the client. pass class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} def connection_handler(self, conn, address): # Convert "conn" (a socket object) into a file-like object. socket_file = conn.makefile('rwb') # Process client requests until client disconnects. while True: try: data = self._protocol.handle_request(socket_file) except Disconnect: break try: resp = self.get_response(data) except CommandError as exc: resp = Error(exc.args[0]) self._protocol.write_response(socket_file, resp) def get_response(self, data): # Here we'll actually unpack the data sent by the client, execute the # command they specified, and pass back the return value. pass def run(self): self._server.serve_forever()
希望以上代碼相當(dāng)清楚。我們已經(jīng)分開(kāi)了擔(dān)憂,以便 協(xié)議處理屬于自己的類,有兩種公共方法: handle_request
和 write_response
。服務(wù)器本身使用協(xié)議 處理程序解壓縮客戶端請(qǐng)求并將服務(wù)器響應(yīng)序列化回 客戶。The get_response()
該方法將用于執(zhí)行命令 由客戶發(fā)起。
仔細(xì)查看代碼 connection_handler()
方法,你可以 看到我們?cè)谔捉幼謱?duì)象周圍獲得了類似文件的包裝紙。這個(gè)包裝器 讓我們抽象一些 怪癖 通常會(huì)遇到使用原始插座的情況。函數(shù)輸入 無(wú)窮循環(huán),讀取客戶端的請(qǐng)求,發(fā)送響應(yīng),最后 客戶端斷開(kāi)連接時(shí)退出循環(huán)(由 read()
返回 一個(gè)空字符串)。
我們使用鍵入的異常來(lái)處理客戶端斷開(kāi)連接并通知用戶 錯(cuò)誤處理命令。例如,如果用戶做錯(cuò)了 對(duì)服務(wù)器的格式化請(qǐng)求,我們將提出一個(gè) CommandError
, 哪個(gè)是 序列化為錯(cuò)誤響應(yīng)并發(fā)送給客戶端。
在繼續(xù)之前,讓我們討論客戶端和服務(wù)器將如何通信。
線程
我面臨的第一個(gè)挑戰(zhàn)是如何處理通過(guò) 線。我在網(wǎng)上找到的大多數(shù)示例都是毫無(wú)意義的回聲服務(wù)器,它們進(jìn)行了轉(zhuǎn)換 套接字到類似文件的對(duì)象,并且剛剛調(diào)用 readline()
。如果我想 用新線存儲(chǔ)一些腌制的數(shù)據(jù)或字符串,我需要一些 一種序列化格式。
在浪費(fèi)時(shí)間嘗試發(fā)明合適的東西之后,我決定閱讀 有關(guān)文檔 Redis協(xié)議, 其中 事實(shí)證明實(shí)施起來(lái)非常簡(jiǎn)單,并且具有 支持幾種不同的數(shù)據(jù)類型。
Redis協(xié)議使用請(qǐng)求/響應(yīng)通信模式與 客戶。來(lái)自服務(wù)器的響應(yīng)將使用第一個(gè)字節(jié)來(lái)指示 數(shù)據(jù)類型,然后是數(shù)據(jù),以回車/線路進(jìn)給終止。
讓我們填寫協(xié)議處理程序的類,使其實(shí)現(xiàn)Redis 協(xié)議。
class ProtocolHandler(object): def __init__(self): self.handlers = { '+': self.handle_simple_string, '-': self.handle_error, ':': self.handle_integer, '$': self.handle_string, '*': self.handle_array, '%': self.handle_dict} def handle_request(self, socket_file): first_byte = socket_file.read(1) if not first_byte: raise Disconnect() try: # Delegate to the appropriate handler based on the first byte. return self.handlers[first_byte](socket_file) except KeyError: raise CommandError('bad request') def handle_simple_string(self, socket_file): return socket_file.readline().rstrip('\r\n') def handle_error(self, socket_file): return Error(socket_file.readline().rstrip('\r\n')) def handle_integer(self, socket_file): return int(socket_file.readline().rstrip('\r\n')) def handle_string(self, socket_file): # First read the length ($<length>\r\n). length = int(socket_file.readline().rstrip('\r\n')) if length == -1: return None # Special-case for NULLs. length += 2 # Include the trailing \r\n in count. return socket_file.read(length)[:-2] def handle_array(self, socket_file): num_elements = int(socket_file.readline().rstrip('\r\n')) return [self.handle_request(socket_file) for _ in range(num_elements)] def handle_dict(self, socket_file): num_items = int(socket_file.readline().rstrip('\r\n')) elements = [self.handle_request(socket_file) for _ in range(num_items * 2)] return dict(zip(elements[::2], elements[1::2]))
對(duì)于協(xié)議的序列化方面,我們將執(zhí)行與上述相反的操作: 將Python對(duì)象轉(zhuǎn)換為序列化的對(duì)象!
class ProtocolHandler(object): # ... above methods omitted ... def write_response(self, socket_file, data): buf = BytesIO() self._write(buf, data) buf.seek(0) socket_file.write(buf.getvalue()) socket_file.flush() def _write(self, buf, data): if isinstance(data, str): data = data.encode('utf-8') if isinstance(data, bytes): buf.write('$%s\r\n%s\r\n' % (len(data), data)) elif isinstance(data, int): buf.write(':%s\r\n' % data) elif isinstance(data, Error): buf.write('-%s\r\n' % error.message) elif isinstance(data, (list, tuple)): buf.write('*%s\r\n' % len(data)) for item in data: self._write(buf, item) elif isinstance(data, dict): buf.write('%%%s\r\n' % len(data)) for key in data: self._write(buf, key) self._write(buf, data[key]) elif data is None: buf.write('$-1\r\n') else: raise CommandError('unrecognized type: %s' % type(data))
將協(xié)議處理保持在其自己的類中的另一個(gè)好處是 我們可以重復(fù)使用 handle_request
和 write_response
建立方法 客戶端庫(kù)。
執(zhí)行命令
類Server
我們模擬的課程現(xiàn)在需要 get_response()
方法 已實(shí)施。命令將假定由客戶端以簡(jiǎn)單方式發(fā)送 字符串或命令參數(shù)數(shù)組,因此 data
傳遞給 get_response()
將是字節(jié)或列表。為了簡(jiǎn)化處理,如果 data
這是一個(gè)簡(jiǎn)單的字符串,我們將通過(guò)拆分將其轉(zhuǎn)換為列表 空格。
第一個(gè)參數(shù)將是命令名稱,并帶有任何其他參數(shù) 屬于指定命令。就像我們對(duì)第一個(gè)的映射一樣 字節(jié)給處理者 ProtocolHandler
, 讓我們創(chuàng)建一個(gè)的映射 命令回調(diào) Server
:
class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} self._commands = self.get_commands() def get_commands(self): return { 'GET': self.get, 'SET': self.set, 'DELETE': self.delete, 'FLUSH': self.flush, 'MGET': self.mget, 'MSET': self.mset} def get_response(self, data): if not isinstance(data, list): try: data = data.split() except: raise CommandError('Request must be list or simple string.') if not data: raise CommandError('Missing command') command = data[0].upper() if command not in self._commands: raise CommandError('Unrecognized command: %s' % command) return self._commands[command](*data[1:])
我們的服務(wù)器快完成了! 我們只需要執(zhí)行六個(gè)命令 在 get_commands()
方法:
class Server(object): def get(self, key): return self._kv.get(key) def set(self, key, value): self._kv[key] = value return 1 def delete(self, key): if key in self._kv: del self._kv[key] return 1 return 0 def flush(self): kvlen = len(self._kv) self._kv.clear() return kvlen def mget(self, *keys): return [self._kv.get(key) for key in keys] def mset(self, *items): data = zip(items[::2], items[1::2]) for key, value in data: self._kv[key] = value return len(data)
而已! 我們的服務(wù)器現(xiàn)在可以開(kāi)始處理請(qǐng)求了。在下一個(gè) 本節(jié),我們將實(shí)現(xiàn)一個(gè)客戶端與服務(wù)器進(jìn)行交互。
客戶端
要與服務(wù)器交互,讓我們重新使用 ProtocolHandler
類到 實(shí)現(xiàn)一個(gè)簡(jiǎn)單的客戶端。客戶端將連接到服務(wù)器并發(fā)送 命令編碼為列表。我們將同時(shí)使用 write_response()
和 handle_request()
編碼請(qǐng)求和處理服務(wù)器響應(yīng)的邏輯 分別。
class Client(object): def __init__(self, host='127.0.0.1', port=31337): self._protocol = ProtocolHandler() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((host, port)) self._fh = self._socket.makefile('rwb') def execute(self, *args): self._protocol.write_response(self._fh, args) resp = self._protocol.handle_request(self._fh) if isinstance(resp, Error): raise CommandError(resp.message) return resp
與 execute()
方法上,我們可以傳遞任意參數(shù)列表,這些參數(shù)將被編碼為數(shù)組并發(fā)送到服務(wù)器。來(lái)自服務(wù)器的響應(yīng)被解析并作為Python對(duì)象返回。為了方便起見(jiàn),我們可以為各個(gè)命令編寫客戶端方法:
class Client(object): # ... def get(self, key): return self.execute('GET', key) def set(self, key, value): return self.execute('SET', key, value) def delete(self, key): return self.execute('DELETE', key) def flush(self): return self.execute('FLUSH') def mget(self, *keys): return self.execute('MGET', *keys) def mset(self, *items): return self.execute('MSET', *items)
為了測(cè)試我們的客戶端,讓我們配置Python腳本以啟動(dòng)服務(wù)器 直接從命令行執(zhí)行時(shí):
測(cè)試服務(wù)器
要測(cè)試服務(wù)器,只需從命令行執(zhí)行服務(wù)器的Python模塊即可。在另一個(gè)終端中,打開(kāi)Python解釋器并導(dǎo)入 Client
來(lái)自服務(wù)器模塊的類。安裝客戶端將打開(kāi)連接,您可以開(kāi)始運(yùn)行命令!
>>> from server_ex import Client >>> client = Client() >>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3') 3 >>> client.get('k2') ['v2-0', 1, 'v2-2'] >>> client.mget('k3', 'k1') ['v3', 'v1'] >>> client.delete('k1') 1 >>> client.get('k1') >>> client.delete('k1') 0 >>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}}) 1 >>> client.get('kx') {'vx': {'vy': 0, 'vz': [1, 2, 3]}} >>> client.flush() 2
完整代碼
from gevent import socket from gevent.pool import Pool from gevent.server import StreamServer from collections import namedtuple from io import BytesIO from socket import error as socket_error import logging logger = logging.getLogger(__name__) class CommandError(Exception): pass class Disconnect(Exception): pass Error = namedtuple('Error', ('message',)) class ProtocolHandler(object): def __init__(self): self.handlers = { '+': self.handle_simple_string, '-': self.handle_error, ':': self.handle_integer, '$': self.handle_string, '*': self.handle_array, '%': self.handle_dict} def handle_request(self, socket_file): first_byte = socket_file.read(1) if not first_byte: raise Disconnect() try: # Delegate to the appropriate handler based on the first byte. return self.handlers[first_byte](socket_file) except KeyError: raise CommandError('bad request') def handle_simple_string(self, socket_file): return socket_file.readline().rstrip('\r\n') def handle_error(self, socket_file): return Error(socket_file.readline().rstrip('\r\n')) def handle_integer(self, socket_file): return int(socket_file.readline().rstrip('\r\n')) def handle_string(self, socket_file): # First read the length ($<length>\r\n). length = int(socket_file.readline().rstrip('\r\n')) if length == -1: return None # Special-case for NULLs. length += 2 # Include the trailing \r\n in count. return socket_file.read(length)[:-2] def handle_array(self, socket_file): num_elements = int(socket_file.readline().rstrip('\r\n')) return [self.handle_request(socket_file) for _ in range(num_elements)] def handle_dict(self, socket_file): num_items = int(socket_file.readline().rstrip('\r\n')) elements = [self.handle_request(socket_file) for _ in range(num_items * 2)] return dict(zip(elements[::2], elements[1::2])) def write_response(self, socket_file, data): buf = BytesIO() self._write(buf, data) buf.seek(0) socket_file.write(buf.getvalue()) socket_file.flush() def _write(self, buf, data): if isinstance(data, str): data = data.encode('utf-8') if isinstance(data, bytes): buf.write('$%s\r\n%s\r\n' % (len(data), data)) elif isinstance(data, int): buf.write(':%s\r\n' % data) elif isinstance(data, Error): buf.write('-%s\r\n' % error.message) elif isinstance(data, (list, tuple)): buf.write('*%s\r\n' % len(data)) for item in data: self._write(buf, item) elif isinstance(data, dict): buf.write('%%%s\r\n' % len(data)) for key in data: self._write(buf, key) self._write(buf, data[key]) elif data is None: buf.write('$-1\r\n') else: raise CommandError('unrecognized type: %s' % type(data)) class Server(object): def __init__(self, host='127.0.0.1', port=31337, max_clients=64): self._pool = Pool(max_clients) self._server = StreamServer( (host, port), self.connection_handler, spawn=self._pool) self._protocol = ProtocolHandler() self._kv = {} self._commands = self.get_commands() def get_commands(self): return { 'GET': self.get, 'SET': self.set, 'DELETE': self.delete, 'FLUSH': self.flush, 'MGET': self.mget, 'MSET': self.mset} def connection_handler(self, conn, address): logger.info('Connection received: %s:%s' % address) # Convert "conn" (a socket object) into a file-like object. socket_file = conn.makefile('rwb') # Process client requests until client disconnects. while True: try: data = self._protocol.handle_request(socket_file) except Disconnect: logger.info('Client went away: %s:%s' % address) break try: resp = self.get_response(data) except CommandError as exc: logger.exception('Command error') resp = Error(exc.args[0]) self._protocol.write_response(socket_file, resp) def run(self): self._server.serve_forever() def get_response(self, data): if not isinstance(data, list): try: data = data.split() except: raise CommandError('Request must be list or simple string.') if not data: raise CommandError('Missing command') command = data[0].upper() if command not in self._commands: raise CommandError('Unrecognized command: %s' % command) else: logger.debug('Received %s', command) return self._commands[command](*data[1:]) def get(self, key): return self._kv.get(key) def set(self, key, value): self._kv[key] = value return 1 def delete(self, key): if key in self._kv: del self._kv[key] return 1 return 0 def flush(self): kvlen = len(self._kv) self._kv.clear() return kvlen def mget(self, *keys): return [self._kv.get(key) for key in keys] def mset(self, *items): data = zip(items[::2], items[1::2]) for key, value in data: self._kv[key] = value return len(data) class Client(object): def __init__(self, host='127.0.0.1', port=31337): self._protocol = ProtocolHandler() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((host, port)) self._fh = self._socket.makefile('rwb') def execute(self, *args): self._protocol.write_response(self._fh, args) resp = self._protocol.handle_request(self._fh) if isinstance(resp, Error): raise CommandError(resp.message) return resp def get(self, key): return self.execute('GET', key) def set(self, key, value): return self.execute('SET', key, value) def delete(self, key): return self.execute('DELETE', key) def flush(self): return self.execute('FLUSH') def mget(self, *keys): return self.execute('MGET', *keys) def mset(self, *items): return self.execute('MSET', *items) if __name__ == '__main__': from gevent import monkey; monkey.patch_all() logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.DEBUG) Server().run()
以上就是使用Python編寫自己的微型Redis的詳細(xì)內(nèi)容,更多關(guān)于Python編寫微型Redis的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Tensorflow簡(jiǎn)單驗(yàn)證碼識(shí)別應(yīng)用
這篇文章主要為大家詳細(xì)介紹了Tensorflow簡(jiǎn)單驗(yàn)證碼識(shí)別應(yīng)用的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05Django如何實(shí)現(xiàn)RBAC權(quán)限管理
這篇文章主要介紹了Django如何實(shí)現(xiàn)RBAC權(quán)限管理問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12詳解OpenCV自適應(yīng)直方圖均衡化的應(yīng)用
在本文中,將介紹如何應(yīng)用對(duì)比度受限的自適應(yīng)直方圖均衡化 ( Contrast Limited Adaptive Histogram Equalization, CLAHE ) 來(lái)均衡圖像,需要的可以參考一下2022-02-02Python使用asyncio標(biāo)準(zhǔn)庫(kù)對(duì)異步IO的支持
Python中,所有程序的執(zhí)行都是單線程的,但可同時(shí)執(zhí)行多個(gè)任務(wù),不同的任務(wù)被時(shí)間循環(huán)(Event Loop)控制及調(diào)度,Asyncio是Python并發(fā)編程的一種實(shí)現(xiàn)方式;是Python 3.4版本引入的標(biāo)準(zhǔn)庫(kù),直接內(nèi)置了對(duì)異步IO的支持2023-11-11Python3利用Dlib19.7實(shí)現(xiàn)攝像頭人臉識(shí)別的方法
這篇文章主要介紹了Python 3 利用 Dlib 19.7 實(shí)現(xiàn)攝像頭人臉識(shí)別 ,利用python開(kāi)發(fā),借助Dlib庫(kù)捕獲攝像頭中的人臉,提取人臉特征,通過(guò)計(jì)算歐氏距離來(lái)和預(yù)存的人臉特征進(jìn)行對(duì)比,達(dá)到人臉識(shí)別的目的,感興趣的小伙伴們可以參考一下2018-05-05Python匿名函數(shù)/排序函數(shù)/過(guò)濾函數(shù)/映射函數(shù)/遞歸/二分法
這篇文章主要介紹了Python匿名函數(shù)/排序函數(shù)/過(guò)濾函數(shù)/映射函數(shù)/遞歸/二分法 ,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-06-06Django之Mode的外鍵自關(guān)聯(lián)和引用未定義的Model方法
今天小編就為大家分享一篇Django之Mode的外鍵自關(guān)聯(lián)和引用未定義的Model方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-12-12Python的Flask框架中實(shí)現(xiàn)分頁(yè)功能的教程
這篇文章主要介紹了Python的Flask框架中實(shí)現(xiàn)分頁(yè)功能的教程,文中的示例基于一個(gè)博客來(lái)實(shí)現(xiàn),需要的朋友可以參考下2015-04-04