Python 中創(chuàng)建 PostgreSQL 數(shù)據(jù)庫連接池
習(xí)慣于使用數(shù)據(jù)庫之前都必須創(chuàng)建一個連接池,即使是單線程的應(yīng)用,只要有多個方法中需用到數(shù)據(jù)庫連接,建立一兩個連接的也會考慮先池化他們。連接池的好處多多,
- 1) 如果反復(fù)創(chuàng)建連接相當(dāng)耗時,
- 2) 對于單個連接一路用到底的應(yīng)用,有連接池時避免了數(shù)據(jù)庫連接對象傳來傳去,
- 3) 忘記關(guān)連接了,連接池幸許還能幫忙在一定時長后關(guān)掉,當(dāng)然密集取連接的應(yīng)用勢將耗盡連接,
- 4) 一個應(yīng)用打開連接的數(shù)量是可控的
接觸到 Python 后,在使用 PostgreSQL 也自然而然的考慮創(chuàng)建連接池,使用時從池中取,用完后還回去,而不是每次需要連接時創(chuàng)建一個物理的。Python 連接 PostgreSQL 是主要有兩個包, py-postgresql 和 psycopg2 , 而本文的實例將使用后者。
Psycopg 在 psycopg2.pool 模塊中提供了兩個連接池的實現(xiàn)在,它們都繼承自 psycopg2.pool.AbstractConnectionPool,
該抽象類的基本方法是
getconn(key=None):獲取連接putconn(conn, key=None, close=False):歸還連接closeall():關(guān)閉連接池中的所有連接
兩個連接池的實現(xiàn)類是
psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwars): 給單線程應(yīng)用用的psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwars): 多線程時更安全,其實就是在getconn()和putconn()時加了鎖來控制
所以最安全保險的做法還是使用 ThreadedConnectionPool, 在單線程應(yīng)用中, SimpleConnectionPool 也不見得比 ThreadedConnectionPool 效率高多少。
下面來看一個具體的連接池實現(xiàn),其中用到了 Context Manager, 使用時結(jié)合 with 鍵字更方便,用完后不用顯式的調(diào)用 putconn() 歸還連接
db_helper.py
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
import atexit
class DBHelper:
def __init__(self):
self._connection_pool = None
def initialize_connection_pool(self):
db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)
@contextmanager
def get_resource(self, autocommit=True):
if self._connection_pool is None:
self.initialize_connection_pool()
conn = self._connection_pool.getconn()
conn.autocommit = autocommit
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
yield cursor, conn
finally:
cursor.close()
self._connection_pool.putconn(conn)
def shutdown_connection_pool(self):
if self._connection_pool is not None:
self._connection_pool.closeall()
db_helper = DBHelper()
@atexit.register
def shutdown_connection_pool():
db_helper.shutdown_connection_pool()
from psycopg2 import pool
from psycopg2 . extras import RealDictCursor
from contextlib import contextmanager
import atexit
class DBHelper :
def __init__ ( self ) :
self . _connection_pool = None
def initialize_connection_pool ( self ) :
db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
self . _connection_pool = pool . ThreadedConnectionPool ( 1 , 3 , db_dsn )
@ contextmanager
def get_resource ( self , autocommit = True ) :
if self . _connection_pool is None :
self . initialize_connection_pool ( )
conn = self . _connection_pool . getconn ( )
conn . autocommit = autocommit
cursor = conn . cursor ( cursor_factory = RealDictCursor )
try :
yield cursor , conn
finally :
cursor . close ( )
self . _connection_pool . putconn ( conn )
def shutdown_connection_pool ( self ) :
if self . _connection_pool is not None :
self . _connection_pool . closeall ( )
db_helper = DBHelper ( )
@ atexit . register
def shutdown_connection_pool ( ) :
db_helper . shutdown_connection_pool ( )
幾點說明:
- 只在第一次調(diào)用
get_resource()時創(chuàng)建連接池,而不是在from db_helper import db_helper引用時就創(chuàng)建連接池 Context Manager返回了兩個對象,cursor和connection, 需要用connection管理事物時用它- 默認時
cursor返回的記錄是字典,而非數(shù)組 - 默認時連接為自動提交
- 最后的
@atexit.register那個ShutdownHook可能有點多余,在進程退出時連接也被關(guān)閉,TIME_WAIT時間應(yīng)該會稍長些
使用方式:
如果不用事物
from db_helper import db_helper
with db_helper.get_resource() as (cursor, _):
cursor.execute('select * from users')
for record in cursor.fetchall():
... process record, record['name'] ...
from db_helper import db_helper
with db_helper . get_resource ( ) as ( cursor , _ ) :
cursor . execute ( 'select * from users' )
for record in cursor . fetchall ( ) :
. . . process record , record [ 'name' ] . . .
如果需要用到事物
with db_helper.get_resource(autocommit=False) as (cursor, _):
try:
cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
cursor.execute('delete from orders where user_id = %s', (1,))
conn.commit()
except:
conn.rollback()
with db_helper . get_resource ( autocommit = False ) as ( cursor , _ ) :
try :
cursor . execute ( 'update users set name = %s where id = %s' , ( 'new_name' , 1 ) )
cursor . execute ( 'delete from orders where user_id = %s' , ( 1 , ) )
conn . commit ( )
except :
conn . rollback ( )
在寫作本文時,查看 psycopg 的官網(wǎng)時,發(fā)現(xiàn) Psycopg 3.0 正式版在 2021-10-13 日發(fā)布了( Psycopg 3.0 released ), 更好的支持 async。在 Psycopg2 2.2 版本時就開始支持異步了。而且還注意到 Psycopg 的主要部分是用 C 實現(xiàn)的,才使得它效率比較高,也難怪經(jīng)常用 pip install psycopg2 安裝不成功,而要用 pip install psycopg2-binary 來安裝的原因。
在創(chuàng)建連接池時加上參數(shù) keepalivesXxx 能讓服務(wù)器及時斷掉死鏈接,否則在 Linux 下默認要 2 個小時后才斷開。死鏈接的情況發(fā)生在客戶端異常退出(如斷電)時先前建立的鏈接就變?yōu)樗梨溄恿恕?/p>
pool.ThreadedConnectionPool(1, 3, db_dsn, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
PostgreSQL 服務(wù)端會對連接在空閑 tcp_keepalives_idle 秒后,主動發(fā)送tcp_keepalives_count 個 tcp_keeplive 偵測包,每個偵探包在 tcp_keepalives_interval 秒內(nèi)都沒有回應(yīng),就認為是死連接,于是切斷它。
到此這篇關(guān)于Python 中創(chuàng)建 PostgreSQL 數(shù)據(jù)庫連接池的文章就介紹到這了,更多相關(guān)PostgreSQL Python內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python中的copy()函數(shù)詳解(list,array)
這篇文章主要介紹了Python中的copy()函數(shù)詳解(list,array),具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09
對Python 兩大環(huán)境管理神器 pyenv 和 virtualenv詳解
今天小編就為大家分享一篇對Python 兩大環(huán)境管理神器 pyenv 和 virtualenv詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-12-12
pandas 實現(xiàn)將重復(fù)表格去重,并重新轉(zhuǎn)換為表格的方法
下面小編就為大家分享一篇pandas 實現(xiàn)將重復(fù)表格去重,并重新轉(zhuǎn)換為表格的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-04-04

