欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用Python編寫自己的微型Redis

 更新時間:2024年03月12日 14:15:59   作者:Python魔法師  
這篇文章主要為大家詳細介紹了使用Python編寫自己的微型Redis的相關知識,文中的示例代碼講解詳細,具有一定的學習價值,感興趣的小伙伴可以了解一下

building-a-simple-redis-server-with-python

前幾天我想到,寫一個簡單的東西會很整潔 雷迪斯-像數(shù)據(jù)庫服務器。雖然我有很多 WSGI應用程序的經(jīng)驗,數(shù)據(jù)庫服務器展示了一種新穎 挑戰(zhàn),并被證明是學習如何工作的不錯的實際方法 Python中的套接字。在這篇文章中,我將分享我在此過程中學到的知識。

我項目的目的是 編寫一個簡單的服務器 我可以用 我的任務隊列項目稱為 Huey。 Huey使用Redis作為默認存儲引擎來跟蹤被引用的工作, 完成的工作和其他結果。就本職位而言, 我進一步縮小了原始項目的范圍,以免造成混亂 使用代碼的水域,您可以很容易地自己寫,但是如果您 很好奇,你可以看看 最終結果 這里 (文件)。

我們將要構建的服務器將能夠響應以下命令:

  • GET <key>
  • SET <key> <value>
  • DELETE <key>
  • FLUSH
  • MGET <key1> ... <keyn>
  • MSET <key1> <value1> ... <keyn> <valuen>

我們還將支持以下數(shù)據(jù)類型:

  • Strings and Binary Data
  • Numbers
  • NULL
  • Arrays (which may be nested)
  • Dictionaries (which may be nested)
  • Error messages

為了異步處理多個客戶端,我們將使用 gevent, 但是您也可以使用標準庫的 SocketServer 模塊與 要么 ForkingMixin 或 ThreadingMixin

骨架

讓我們?yōu)榉掌髟O置一個框架。我們需要服務器本身,以及 新客戶端連接時要執(zhí)行的回調(diào)。另外,我們需要 某種邏輯來處理客戶端請求并發(fā)送響應。

這是一個開始:

from gevent import socket
from gevent.pool import Pool
from gevent.server import StreamServer

from collections import namedtuple
from io import BytesIO
from socket import error as socket_error


# We'll use exceptions to notify the connection-handling loop of problems.
class CommandError(Exception): pass
class Disconnect(Exception): pass

Error = namedtuple('Error', ('message',))


class ProtocolHandler(object):
    def handle_request(self, socket_file):
        # Parse a request from the client into it's component parts.
        pass

    def write_response(self, socket_file, data):
        # Serialize the response data and send it to the client.
        pass


class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

    def connection_handler(self, conn, address):
        # Convert "conn" (a socket object) into a file-like object.
        socket_file = conn.makefile('rwb')

        # Process client requests until client disconnects.
        while True:
            try:
                data = self._protocol.handle_request(socket_file)
            except Disconnect:
                break

            try:
                resp = self.get_response(data)
            except CommandError as exc:
                resp = Error(exc.args[0])

            self._protocol.write_response(socket_file, resp)

    def get_response(self, data):
        # Here we'll actually unpack the data sent by the client, execute the
        # command they specified, and pass back the return value.
        pass

    def run(self):
        self._server.serve_forever()

希望以上代碼相當清楚。我們已經(jīng)分開了擔憂,以便 協(xié)議處理屬于自己的類,有兩種公共方法: handle_request 和 write_response。服務器本身使用協(xié)議 處理程序解壓縮客戶端請求并將服務器響應序列化回 客戶。The get_response() 該方法將用于執(zhí)行命令 由客戶發(fā)起。

仔細查看代碼 connection_handler() 方法,你可以 看到我們在套接字對象周圍獲得了類似文件的包裝紙。這個包裝器 讓我們抽象一些 怪癖 通常會遇到使用原始插座的情況。函數(shù)輸入 無窮循環(huán),讀取客戶端的請求,發(fā)送響應,最后 客戶端斷開連接時退出循環(huán)(由 read() 返回 一個空字符串)。

我們使用鍵入的異常來處理客戶端斷開連接并通知用戶 錯誤處理命令。例如,如果用戶做錯了 對服務器的格式化請求,我們將提出一個 CommandError, 哪個是 序列化為錯誤響應并發(fā)送給客戶端。

在繼續(xù)之前,讓我們討論客戶端和服務器將如何通信。

線程

我面臨的第一個挑戰(zhàn)是如何處理通過 線。我在網(wǎng)上找到的大多數(shù)示例都是毫無意義的回聲服務器,它們進行了轉(zhuǎn)換 套接字到類似文件的對象,并且剛剛調(diào)用 readline()。如果我想 用新線存儲一些腌制的數(shù)據(jù)或字符串,我需要一些 一種序列化格式。

在浪費時間嘗試發(fā)明合適的東西之后,我決定閱讀 有關文檔 Redis協(xié)議, 其中 事實證明實施起來非常簡單,并且具有 支持幾種不同的數(shù)據(jù)類型。

Redis協(xié)議使用請求/響應通信模式與 客戶。來自服務器的響應將使用第一個字節(jié)來指示 數(shù)據(jù)類型,然后是數(shù)據(jù),以回車/線路進給終止。

讓我們填寫協(xié)議處理程序的類,使其實現(xiàn)Redis 協(xié)議。

class ProtocolHandler(object):
    def __init__(self):
        self.handlers = {
            '+': self.handle_simple_string,
            '-': self.handle_error,
            ':': self.handle_integer,
            '$': self.handle_string,
            '*': self.handle_array,
            '%': self.handle_dict}

    def handle_request(self, socket_file):
        first_byte = socket_file.read(1)
        if not first_byte:
            raise Disconnect()

        try:
            # Delegate to the appropriate handler based on the first byte.
            return self.handlers[first_byte](socket_file)
        except KeyError:
            raise CommandError('bad request')

    def handle_simple_string(self, socket_file):
        return socket_file.readline().rstrip('\r\n')

    def handle_error(self, socket_file):
        return Error(socket_file.readline().rstrip('\r\n'))

    def handle_integer(self, socket_file):
        return int(socket_file.readline().rstrip('\r\n'))

    def handle_string(self, socket_file):
        # First read the length ($<length>\r\n).
        length = int(socket_file.readline().rstrip('\r\n'))
        if length == -1:
            return None  # Special-case for NULLs.
        length += 2  # Include the trailing \r\n in count.
        return socket_file.read(length)[:-2]

    def handle_array(self, socket_file):
        num_elements = int(socket_file.readline().rstrip('\r\n'))
        return [self.handle_request(socket_file) for _ in range(num_elements)]

    def handle_dict(self, socket_file):
        num_items = int(socket_file.readline().rstrip('\r\n'))
        elements = [self.handle_request(socket_file)
                    for _ in range(num_items * 2)]
        return dict(zip(elements[::2], elements[1::2]))

對于協(xié)議的序列化方面,我們將執(zhí)行與上述相反的操作: 將Python對象轉(zhuǎn)換為序列化的對象!

class ProtocolHandler(object):
    # ... above methods omitted ...
    def write_response(self, socket_file, data):
        buf = BytesIO()
        self._write(buf, data)
        buf.seek(0)
        socket_file.write(buf.getvalue())
        socket_file.flush()

    def _write(self, buf, data):
        if isinstance(data, str):
            data = data.encode('utf-8')

        if isinstance(data, bytes):
            buf.write('$%s\r\n%s\r\n' % (len(data), data))
        elif isinstance(data, int):
            buf.write(':%s\r\n' % data)
        elif isinstance(data, Error):
            buf.write('-%s\r\n' % error.message)
        elif isinstance(data, (list, tuple)):
            buf.write('*%s\r\n' % len(data))
            for item in data:
                self._write(buf, item)
        elif isinstance(data, dict):
            buf.write('%%%s\r\n' % len(data))
            for key in data:
                self._write(buf, key)
                self._write(buf, data[key])
        elif data is None:
            buf.write('$-1\r\n')
        else:
            raise CommandError('unrecognized type: %s' % type(data))

將協(xié)議處理保持在其自己的類中的另一個好處是 我們可以重復使用 handle_request 和 write_response 建立方法 客戶端庫。

執(zhí)行命令

Server 我們模擬的課程現(xiàn)在需要 get_response() 方法 已實施。命令將假定由客戶端以簡單方式發(fā)送 字符串或命令參數(shù)數(shù)組,因此 data 傳遞給 get_response() 將是字節(jié)或列表。為了簡化處理,如果 data 這是一個簡單的字符串,我們將通過拆分將其轉(zhuǎn)換為列表 空格。

第一個參數(shù)將是命令名稱,并帶有任何其他參數(shù) 屬于指定命令。就像我們對第一個的映射一樣 字節(jié)給處理者 ProtocolHandler, 讓我們創(chuàng)建一個的映射 命令回調(diào) Server:

class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

        self._commands = self.get_commands()

    def get_commands(self):
        return {
            'GET': self.get,
            'SET': self.set,
            'DELETE': self.delete,
            'FLUSH': self.flush,
            'MGET': self.mget,
            'MSET': self.mset}

    def get_response(self, data):
        if not isinstance(data, list):
            try:
                data = data.split()
            except:
                raise CommandError('Request must be list or simple string.')

        if not data:
            raise CommandError('Missing command')

        command = data[0].upper()
        if command not in self._commands:
            raise CommandError('Unrecognized command: %s' % command)

        return self._commands[command](*data[1:])

我們的服務器快完成了! 我們只需要執(zhí)行六個命令 在 get_commands() 方法:

class Server(object):
    def get(self, key):
        return self._kv.get(key)

    def set(self, key, value):
        self._kv[key] = value
        return 1

    def delete(self, key):
        if key in self._kv:
            del self._kv[key]
            return 1
        return 0

    def flush(self):
        kvlen = len(self._kv)
        self._kv.clear()
        return kvlen

    def mget(self, *keys):
        return [self._kv.get(key) for key in keys]

    def mset(self, *items):
        data = zip(items[::2], items[1::2])
        for key, value in data:
            self._kv[key] = value
        return len(data)

而已! 我們的服務器現(xiàn)在可以開始處理請求了。在下一個 本節(jié),我們將實現(xiàn)一個客戶端與服務器進行交互。

客戶端

要與服務器交互,讓我們重新使用 ProtocolHandler 類到 實現(xiàn)一個簡單的客戶端??蛻舳藢⑦B接到服務器并發(fā)送 命令編碼為列表。我們將同時使用 write_response() 和 handle_request() 編碼請求和處理服務器響應的邏輯 分別。

class Client(object):
    def __init__(self, host='127.0.0.1', port=31337):
        self._protocol = ProtocolHandler()
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((host, port))
        self._fh = self._socket.makefile('rwb')

    def execute(self, *args):
        self._protocol.write_response(self._fh, args)
        resp = self._protocol.handle_request(self._fh)
        if isinstance(resp, Error):
            raise CommandError(resp.message)
        return resp

與 execute() 方法上,我們可以傳遞任意參數(shù)列表,這些參數(shù)將被編碼為數(shù)組并發(fā)送到服務器。來自服務器的響應被解析并作為Python對象返回。為了方便起見,我們可以為各個命令編寫客戶端方法:

class Client(object):
    # ...
    def get(self, key):
        return self.execute('GET', key)

    def set(self, key, value):
        return self.execute('SET', key, value)

    def delete(self, key):
        return self.execute('DELETE', key)

    def flush(self):
        return self.execute('FLUSH')

    def mget(self, *keys):
        return self.execute('MGET', *keys)

    def mset(self, *items):
        return self.execute('MSET', *items)

為了測試我們的客戶端,讓我們配置Python腳本以啟動服務器 直接從命令行執(zhí)行時:

測試服務器

要測試服務器,只需從命令行執(zhí)行服務器的Python模塊即可。在另一個終端中,打開Python解釋器并導入 Client 來自服務器模塊的類。安裝客戶端將打開連接,您可以開始運行命令!

>>> from server_ex import Client
>>> client = Client()
>>> client.mset('k1', 'v1', 'k2', ['v2-0', 1, 'v2-2'], 'k3', 'v3')
3
>>> client.get('k2')
['v2-0', 1, 'v2-2']
>>> client.mget('k3', 'k1')
['v3', 'v1']
>>> client.delete('k1')
1
>>> client.get('k1')
>>> client.delete('k1')
0
>>> client.set('kx', {'vx': {'vy': 0, 'vz': [1, 2, 3]}})
1
>>> client.get('kx')
{'vx': {'vy': 0, 'vz': [1, 2, 3]}}
>>> client.flush()
2

完整代碼

from gevent import socket
from gevent.pool import Pool
from gevent.server import StreamServer

from collections import namedtuple
from io import BytesIO
from socket import error as socket_error
import logging


logger = logging.getLogger(__name__)


class CommandError(Exception): pass
class Disconnect(Exception): pass

Error = namedtuple('Error', ('message',))


class ProtocolHandler(object):
    def __init__(self):
        self.handlers = {
            '+': self.handle_simple_string,
            '-': self.handle_error,
            ':': self.handle_integer,
            '$': self.handle_string,
            '*': self.handle_array,
            '%': self.handle_dict}

    def handle_request(self, socket_file):
        first_byte = socket_file.read(1)
        if not first_byte:
            raise Disconnect()

        try:
            # Delegate to the appropriate handler based on the first byte.
            return self.handlers[first_byte](socket_file)
        except KeyError:
            raise CommandError('bad request')

    def handle_simple_string(self, socket_file):
        return socket_file.readline().rstrip('\r\n')

    def handle_error(self, socket_file):
        return Error(socket_file.readline().rstrip('\r\n'))

    def handle_integer(self, socket_file):
        return int(socket_file.readline().rstrip('\r\n'))

    def handle_string(self, socket_file):
        # First read the length ($<length>\r\n).
        length = int(socket_file.readline().rstrip('\r\n'))
        if length == -1:
            return None  # Special-case for NULLs.
        length += 2  # Include the trailing \r\n in count.
        return socket_file.read(length)[:-2]

    def handle_array(self, socket_file):
        num_elements = int(socket_file.readline().rstrip('\r\n'))
        return [self.handle_request(socket_file) for _ in range(num_elements)]
    
    def handle_dict(self, socket_file):
        num_items = int(socket_file.readline().rstrip('\r\n'))
        elements = [self.handle_request(socket_file)
                    for _ in range(num_items * 2)]
        return dict(zip(elements[::2], elements[1::2]))

    def write_response(self, socket_file, data):
        buf = BytesIO()
        self._write(buf, data)
        buf.seek(0)
        socket_file.write(buf.getvalue())
        socket_file.flush()

    def _write(self, buf, data):
        if isinstance(data, str):
            data = data.encode('utf-8')

        if isinstance(data, bytes):
            buf.write('$%s\r\n%s\r\n' % (len(data), data))
        elif isinstance(data, int):
            buf.write(':%s\r\n' % data)
        elif isinstance(data, Error):
            buf.write('-%s\r\n' % error.message)
        elif isinstance(data, (list, tuple)):
            buf.write('*%s\r\n' % len(data))
            for item in data:
                self._write(buf, item)
        elif isinstance(data, dict):
            buf.write('%%%s\r\n' % len(data))
            for key in data:
                self._write(buf, key)
                self._write(buf, data[key])
        elif data is None:
            buf.write('$-1\r\n')
        else:
            raise CommandError('unrecognized type: %s' % type(data))


class Server(object):
    def __init__(self, host='127.0.0.1', port=31337, max_clients=64):
        self._pool = Pool(max_clients)
        self._server = StreamServer(
            (host, port),
            self.connection_handler,
            spawn=self._pool)

        self._protocol = ProtocolHandler()
        self._kv = {}

        self._commands = self.get_commands()

    def get_commands(self):
        return {
            'GET': self.get,
            'SET': self.set,
            'DELETE': self.delete,
            'FLUSH': self.flush,
            'MGET': self.mget,
            'MSET': self.mset}

    def connection_handler(self, conn, address):
        logger.info('Connection received: %s:%s' % address)
        # Convert "conn" (a socket object) into a file-like object.
        socket_file = conn.makefile('rwb')

        # Process client requests until client disconnects.
        while True:
            try:
                data = self._protocol.handle_request(socket_file)
            except Disconnect:
                logger.info('Client went away: %s:%s' % address)
                break

            try:
                resp = self.get_response(data)
            except CommandError as exc:
                logger.exception('Command error')
                resp = Error(exc.args[0])

            self._protocol.write_response(socket_file, resp)

    def run(self):
        self._server.serve_forever()

    def get_response(self, data):
        if not isinstance(data, list):
            try:
                data = data.split()
            except:
                raise CommandError('Request must be list or simple string.')

        if not data:
            raise CommandError('Missing command')

        command = data[0].upper()
        if command not in self._commands:
            raise CommandError('Unrecognized command: %s' % command)
        else:
            logger.debug('Received %s', command)

        return self._commands[command](*data[1:])

    def get(self, key):
        return self._kv.get(key)

    def set(self, key, value):
        self._kv[key] = value
        return 1

    def delete(self, key):
        if key in self._kv:
            del self._kv[key]
            return 1
        return 0

    def flush(self):
        kvlen = len(self._kv)
        self._kv.clear()
        return kvlen

    def mget(self, *keys):
        return [self._kv.get(key) for key in keys]

    def mset(self, *items):
        data = zip(items[::2], items[1::2])
        for key, value in data:
            self._kv[key] = value
        return len(data)


class Client(object):
    def __init__(self, host='127.0.0.1', port=31337):
        self._protocol = ProtocolHandler()
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._socket.connect((host, port))
        self._fh = self._socket.makefile('rwb')

    def execute(self, *args):
        self._protocol.write_response(self._fh, args)
        resp = self._protocol.handle_request(self._fh)
        if isinstance(resp, Error):
            raise CommandError(resp.message)
        return resp

    def get(self, key):
        return self.execute('GET', key)

    def set(self, key, value):
        return self.execute('SET', key, value)

    def delete(self, key):
        return self.execute('DELETE', key)

    def flush(self):
        return self.execute('FLUSH')

    def mget(self, *keys):
        return self.execute('MGET', *keys)

    def mset(self, *items):
        return self.execute('MSET', *items)


if __name__ == '__main__':
    from gevent import monkey; monkey.patch_all()
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.DEBUG)
    Server().run()

以上就是使用Python編寫自己的微型Redis的詳細內(nèi)容,更多關于Python編寫微型Redis的資料請關注腳本之家其它相關文章!

相關文章

  • Tensorflow簡單驗證碼識別應用

    Tensorflow簡單驗證碼識別應用

    這篇文章主要為大家詳細介紹了Tensorflow簡單驗證碼識別應用的相關資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-05-05
  • Django如何實現(xiàn)RBAC權限管理

    Django如何實現(xiàn)RBAC權限管理

    這篇文章主要介紹了Django如何實現(xiàn)RBAC權限管理問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-12-12
  • 使用Python為中秋節(jié)繪制一塊美味的月餅

    使用Python為中秋節(jié)繪制一塊美味的月餅

    這篇文章主要介紹了使用Python為中秋節(jié)繪制一塊美味的月餅,,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-09-09
  • 如何使用python?docx模塊操作word文檔

    如何使用python?docx模塊操作word文檔

    這篇文章主要介紹了如何使用python?docx模塊操作word文檔,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下
    2022-09-09
  • 詳解OpenCV自適應直方圖均衡化的應用

    詳解OpenCV自適應直方圖均衡化的應用

    在本文中,將介紹如何應用對比度受限的自適應直方圖均衡化 ( Contrast Limited Adaptive Histogram Equalization, CLAHE ) 來均衡圖像,需要的可以參考一下
    2022-02-02
  • Python使用asyncio標準庫對異步IO的支持

    Python使用asyncio標準庫對異步IO的支持

    Python中,所有程序的執(zhí)行都是單線程的,但可同時執(zhí)行多個任務,不同的任務被時間循環(huán)(Event Loop)控制及調(diào)度,Asyncio是Python并發(fā)編程的一種實現(xiàn)方式;是Python 3.4版本引入的標準庫,直接內(nèi)置了對異步IO的支持
    2023-11-11
  • Python3利用Dlib19.7實現(xiàn)攝像頭人臉識別的方法

    Python3利用Dlib19.7實現(xiàn)攝像頭人臉識別的方法

    這篇文章主要介紹了Python 3 利用 Dlib 19.7 實現(xiàn)攝像頭人臉識別 ,利用python開發(fā),借助Dlib庫捕獲攝像頭中的人臉,提取人臉特征,通過計算歐氏距離來和預存的人臉特征進行對比,達到人臉識別的目的,感興趣的小伙伴們可以參考一下
    2018-05-05
  • Python匿名函數(shù)/排序函數(shù)/過濾函數(shù)/映射函數(shù)/遞歸/二分法

    Python匿名函數(shù)/排序函數(shù)/過濾函數(shù)/映射函數(shù)/遞歸/二分法

    這篇文章主要介紹了Python匿名函數(shù)/排序函數(shù)/過濾函數(shù)/映射函數(shù)/遞歸/二分法 ,本文通過實例代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-06-06
  • Django之Mode的外鍵自關聯(lián)和引用未定義的Model方法

    Django之Mode的外鍵自關聯(lián)和引用未定義的Model方法

    今天小編就為大家分享一篇Django之Mode的外鍵自關聯(lián)和引用未定義的Model方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-12-12
  • Python的Flask框架中實現(xiàn)分頁功能的教程

    Python的Flask框架中實現(xiàn)分頁功能的教程

    這篇文章主要介紹了Python的Flask框架中實現(xiàn)分頁功能的教程,文中的示例基于一個博客來實現(xiàn),需要的朋友可以參考下
    2015-04-04

最新評論