Python與Redis的連接教程
今天在寫zabbix storm job監(jiān)控腳本的時(shí)候用到了python的redis模塊,之前也有用過(guò),但是沒有過(guò)多的了解,今天看了下相關(guān)的api和源碼,看到有ConnectionPool的實(shí)現(xiàn),這里簡(jiǎn)單說(shuō)下。
在ConnectionPool之前,如果需要連接redis,我都是用StrictRedis這個(gè)類,在源碼中可以看到這個(gè)類的具體解釋:
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx()
有了ConnectionPool這個(gè)類之后,可以使用如下方法
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx) r = redis.Redis(connection_pool=pool)
這里Redis是StrictRedis的子類
簡(jiǎn)單分析如下:
在StrictRedis類的__init__方法中,可以初始化connection_pool這個(gè)參數(shù),其對(duì)應(yīng)的是一個(gè)ConnectionPool的對(duì)象:
class StrictRedis(object): ........ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): if not connection_pool: .......... connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool
在StrictRedis的實(shí)例執(zhí)行具體的命令時(shí)會(huì)調(diào)用execute_command方法,這里可以看到具體實(shí)現(xiàn)是從連接池中獲取一個(gè)具體的連接,然后執(zhí)行命令,完成后釋放連接:
# COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) #調(diào)用ConnectionPool.get_connection方法獲取一個(gè)連接 try: connection.send_command(*args) #命令執(zhí)行,這里為Connection.send_command return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection) #調(diào)用ConnectionPool.release釋放連接
在來(lái)看看ConnectionPool類:
class ConnectionPool(object): ........... def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): #類初始化時(shí)調(diào)用構(gòu)造函數(shù) max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: #判斷輸入的max_connections是否合法 raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class #設(shè)置對(duì)應(yīng)的參數(shù) self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() #初始化ConnectionPool 時(shí)的reset操作 def reset(self): self.pid = os.getpid() self._created_connections = 0 #已經(jīng)創(chuàng)建的連接的計(jì)數(shù)器 self._available_connections = [] #聲明一個(gè)空的數(shù)組,用來(lái)存放可用的連接 self._in_use_connections = set() #聲明一個(gè)空的集合,用來(lái)存放已經(jīng)在用的連接 self._check_lock = threading.Lock() ....... def get_connection(self, command_name, *keys, **options): #在連接池中獲取連接的方法 "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() #獲取并刪除代表連接的元素,在第一次獲取connectiong時(shí),因?yàn)開available_connections是一個(gè)空的數(shù)組, 會(huì)直接調(diào)用make_connection方法 except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) #向代表正在使用的連接的集合中添加元素 return connection def make_connection(self): #在_available_connections數(shù)組為空時(shí)獲取連接調(diào)用的方法 "Create a new connection" if self._created_connections >= self.max_connections: #判斷創(chuàng)建的連接是否已經(jīng)達(dá)到最大限制,max_connections可以通過(guò)參數(shù)初始化 raise ConnectionError("Too many connections") self._created_connections += 1 #把代表已經(jīng)創(chuàng)建的連接的數(shù)值+1 return self.connection_class(**self.connection_kwargs) #返回有效的連接,默認(rèn)為Connection(**self.connection_kwargs) def release(self, connection): #釋放連接,鏈接并沒有斷開,只是存在鏈接池中 "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) #從集合中刪除元素 self._available_connections.append(connection) #并添加到_available_connections 的數(shù)組中 def disconnect(self): #斷開所有連接池中的鏈接 "Disconnects all connections in the pool" all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: connection.disconnect()
execute_command最終調(diào)用的是Connection.send_command方法,關(guān)閉鏈接為 Connection.disconnect方法,而Connection類的實(shí)現(xiàn):
class Connection(object): "Manages TCP communication to and from a Redis server" def __del__(self): #對(duì)象刪除時(shí)的操作,調(diào)用disconnect釋放連接 try: self.disconnect() except Exception: pass
核心的鏈接建立方法是通過(guò)socket模塊實(shí)現(xiàn):
def _connect(self): err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.socket_keepalive: #構(gòu)造函數(shù)中默認(rèn) socket_keepalive=False,因此這里默認(rèn)為短連接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) #構(gòu)造函數(shù)中默認(rèn)socket_connect_timeout=None,即連接為blocking的模式 # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) #構(gòu)造函數(shù)中默認(rèn)socket_timeout=None return sock except socket.error as _: err = _ if sock is not None: sock.close() .....
關(guān)閉鏈接的方法:
def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close self._sock.close() except socket.error: pass self._sock = None
可以小結(jié)如下
1)默認(rèn)情況下每創(chuàng)建一個(gè)Redis實(shí)例都會(huì)構(gòu)造出一個(gè)ConnectionPool實(shí)例,每一次訪問(wèn)redis都會(huì)從這個(gè)連接池得到一個(gè)連接,操作完成后會(huì)把該連接放回連接池(連接并沒有釋放),可以構(gòu)造一個(gè)統(tǒng)一的ConnectionPool,在創(chuàng)建Redis實(shí)例時(shí),可以將該ConnectionPool傳入,那么后續(xù)的操作會(huì)從給定的ConnectionPool獲得連接,不會(huì)再重復(fù)創(chuàng)建ConnectionPool。
2)默認(rèn)情況下沒有設(shè)置keepalive和timeout,建立的連接是blocking模式的短連接。
3)不考慮底層tcp的情況下,連接池中的連接會(huì)在ConnectionPool.disconnect中統(tǒng)一銷毀。
- Redis總結(jié)筆記(二):C#連接Redis簡(jiǎn)單例子
- RedisDesktopManager無(wú)法遠(yuǎn)程連接Redis的完美解決方法
- springboot2整合redis使用lettuce連接池的方法(解決lettuce連接池?zé)o效問(wèn)題)
- redis客戶端連接錯(cuò)誤 NOAUTH Authentication required
- Redis連接超時(shí)異常的處理方法
- 詳解springboot配置多個(gè)redis連接
- 詳解Redis開啟遠(yuǎn)程登錄連接
- Springboot2.X集成redis集群(Lettuce)連接的方法
- redis連接報(bào)錯(cuò)error:NOAUTH Authentication required
- redis連接被拒絕的解決方案
- redis-copy使用6379端口無(wú)法連接到Redis服務(wù)器的問(wèn)題
相關(guān)文章
Python程序員鮮為人知但你應(yīng)該知道的17個(gè)問(wèn)題
這篇文章主要介紹了Python程序員代碼編寫時(shí)應(yīng)該避免的17個(gè)“坑”,也可以說(shuō)成Python程序員代碼編寫時(shí)應(yīng)該避免的17個(gè)問(wèn)題,需要的朋友可以參考下2014-06-06Python 正則表達(dá)式(轉(zhuǎn)義問(wèn)題)
這篇文章主要介紹了Python 正則表達(dá)式(轉(zhuǎn)義問(wèn)題),需要的朋友可以參考下2014-12-12轉(zhuǎn)換科學(xué)計(jì)數(shù)法的數(shù)值字符串為decimal類型的方法
今天小編就為大家分享一篇轉(zhuǎn)換科學(xué)計(jì)數(shù)法的數(shù)值字符串為decimal類型的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-07-07Python常用列表數(shù)據(jù)結(jié)構(gòu)小結(jié)
這篇文章主要介紹了Python常用列表數(shù)據(jù)結(jié)構(gòu)小結(jié),很有參考借鑒價(jià)值,需要的朋友可以參考下2014-08-08python繪制已知點(diǎn)的坐標(biāo)的直線實(shí)例
今天小編就為大家分享一篇python繪制已知點(diǎn)的坐標(biāo)的直線實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-07-07Python實(shí)現(xiàn)感知機(jī)(PLA)算法
這篇文章主要為大家詳細(xì)介紹了Python實(shí)現(xiàn)感知機(jī)(PLA)算法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-12-12Python中將圖像轉(zhuǎn)換為PDF的方法實(shí)現(xiàn)
本文主要介紹了Python中將圖像轉(zhuǎn)換為PDF的方法實(shí)現(xiàn),主要使用img2pdf和PyPDF2軟件包,具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08