Python 基于隊列實現(xiàn) tcp socket 連接池的方法
連接池實現(xiàn)
socket_pool.py
# -*- coding:utf-8 -*- import socket import time import threading import os import logging import traceback from queue import Queue, Empty _logger = logging.getLogger('mylogger') class SocketPool: def __init__(self, host, port, min_connections=10, max_connections=10): ''' 初始化Socket連接池 :param host: 目標主機地址 :param port: 目標端口號 :param min_connections: 最小連接數(shù) :param max_connections: 最大連接數(shù) ''' self.host = host self.port = port self.min_connections = min_connections self.max_connections = max_connections self.busy_sockets_dict = {} # 存放從連接池取出的socket的id self._sock_lock = threading.Lock() # 線程鎖保證計數(shù)正確 self._pool = Queue(max_connections) # 基于線程安全的隊列存儲連接 self._lock = threading.Lock() # 線程鎖保證資源安全: self._init_pool() # 預(yù)創(chuàng)建連接 self._start_health_check() # 啟動連接健康檢查線程 def _init_pool(self): '''預(yù)創(chuàng)建連接并填充到池中''' for _ in range(self.min_connections): sock = self._create_socket() self._pool.put(sock) def _create_socket(self): '''創(chuàng)建新的Socket連接''' sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: sock.connect((self.host, self.port)) return sock except socket.error as e: raise ConnectionError(f'Failed to connect: {e}') # 連接失敗拋出異常 def _start_health_check(self): '''啟動后臺線程定期檢查連接有效性''' def check(): while True: with self._lock: for _ in range(self._pool.qsize()): sock = self._pool.get() self.busy_sockets_dict[sock] = 1 try: sock.send(b'PING<END>') # 發(fā)送心跳包驗證連接狀態(tài) # 以下 11 為服務(wù)端返回數(shù)據(jù)字節(jié)長度,不能亂寫,否則會導(dǎo)致獲取非健康檢查響應(yīng)報文數(shù)據(jù)存在多余內(nèi)容,不符合格式,從而導(dǎo)致數(shù)據(jù)解析問題 sock.recv(11) self._pool.put(sock) self.busy_sockets_dict.pop(sock) except (socket.error, ConnectionResetError): _logger.error('socket連接健康檢查出錯:%s, 關(guān)閉失效連接并創(chuàng)建新連接替換' % traceback.format_exc()) sock.close() # 關(guān)閉失效連接并創(chuàng)建新連接替換 self.busy_sockets_dict.pop(sock) new_sock = self._create_socket() self._pool.put(new_sock) # 如果sock數(shù)量小于最小數(shù)量,則補充 for _ in range(0, self.min_connections - self._pool.qsize()): new_sock = self._create_socket() self._pool.put(new_sock) time.sleep(60) # 每60秒檢查一次 threading.Thread(target=check, daemon=True).start() def get_connection(self): ''' 從池中獲取一個可用連接 :return: socket對象 ''' with self._sock_lock: if self._pool.empty(): if len(self.busy_sockets_dict.keys()) < self.max_connections: new_sock = self._create_socket() self.busy_sockets_dict[new_sock] = 1 return new_sock else: raise Empty('No available connections in pool') else: try: sock = self._pool.get(block=False) self.busy_sockets_dict[sock] = 1 return sock except Exception: _logger.error('獲取socket連接出錯:%s' % traceback.format_exc()) raise def release_connection(self, sock): ''' 將連接歸還到池中 :param sock: 待歸還的socket對象 ''' if not sock._closed: self._pool.put(sock) if sock in self.busy_sockets_dict: self.busy_sockets_dict.pop(sock) def close_all(self): '''關(guān)閉池中所有連接''' while not self._pool.empty(): sock = self._pool.get() sock.close() self.busy_sockets_dict.pop(sock.id) self.busy_sockets_dict = {} # 兜底 host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1') port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000')) min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10')) max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100')) socketPool = SocketPool(host, port, min_connections, max_connections)
使用連接池
from socket_pool import socketPool def send_socket_msg(data): global socketPool try: sock = None # 獲取連接(支持超時控制) sock = socketPool.get_connection() # 發(fā)送數(shù)據(jù) sock.sendall(data.encode('utf-8')) except Exception: error_msg = '發(fā)送消息出錯:%s' % traceback.format_exc() _logger.error(error_msg) if sock is not None: sock.close() socketPool.release_connection(sock) return send_socket_msg(data) response = '' try: while True: chunk = sock.recv(4096) chunk = chunk.decode('utf-8') response += chunk if response.endswith('<END>'): response = response.rstrip('<END>') return {'success':True, 'message':response} except Exception: error_msg = '獲取消息出錯:%s' % traceback.format_exc() _logger.error(error_msg) return {'success':False, 'message': error_msg} finally: # 必須歸還連接! socketPool.release_connection(sock)
到此這篇關(guān)于Python 基于隊列實現(xiàn) tcp socket 連接池的文章就介紹到這了,更多相關(guān)Python tcp socket 連接池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Python socket如何實現(xiàn)服務(wù)端和客戶端數(shù)據(jù)傳輸(TCP)
- python用socket實現(xiàn)協(xié)議TCP長連接框架
- Python基于socket實現(xiàn)TCP/IP客戶和服務(wù)器通信
- Python使用socket模塊實現(xiàn)簡單tcp通信
- python 使用raw socket進行TCP SYN掃描實例
- python基于socket實現(xiàn)的UDP及TCP通訊功能示例
- python使用socket實現(xiàn)的傳輸demo示例【基于TCP協(xié)議】
- Python中的TCP socket寫法示例
- python使用socket創(chuàng)建tcp服務(wù)器和客戶端
- Python封裝數(shù)據(jù)庫連接池詳解
- Python3 多線程(連接池)操作MySQL插入數(shù)據(jù)
- python連接池實現(xiàn)示例程序
相關(guān)文章
Python統(tǒng)計列表中的重復(fù)項出現(xiàn)的次數(shù)的方法
這篇文章主要介紹了Python統(tǒng)計列表中的重復(fù)項出現(xiàn)的次數(shù)的方法,需要的朋友可以參考下2014-08-08如何將numpy二維數(shù)組中的np.nan值替換為指定的值
這篇文章主要介紹了將numpy二維數(shù)組中的np.nan值替換為指定的值操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-05-05Python3 使用selenium插件爬取蘇寧商家聯(lián)系電話
這篇文章主要介紹了Python3 selenium爬取蘇寧商家聯(lián)系電話,此處使用了selenium插件 使用的是火狐瀏覽器 信息存儲到csv表格里面,需要的朋友可以參考下2019-12-12python中l(wèi)xml.etree 和 ElementTree 的區(qū)別解析
lxml.etree 提供了更多的功能,例如 XPath、XSLT、Relax NG、 和 XML 模式支持,etree 對 Python unicode 字符串的想法與 ElementTree 不同,本文給大家介紹python中l(wèi)xml.etree 和 ElementTree 的區(qū)別,感興趣的朋友一起看看吧2024-01-01基于Python PaddleSpeech實現(xiàn)語音文字處理
PaddleSpeech基于飛槳PaddlePaddle的語音方向的開源模型庫,用于語音和音頻中的各種關(guān)鍵任務(wù)的開發(fā),包含大量基于深度學(xué)習(xí)前沿和有影響力的模型。本文將介紹如何通過PaddleSpeech實現(xiàn)語音文字處理,感興趣的可以學(xué)習(xí)一下2022-01-01Python爬取股票信息,并可視化數(shù)據(jù)的示例
這篇文章主要介紹了Python爬取股票信息,并可視化數(shù)據(jù)的示例,幫助大家更好的理解和使用python爬蟲,感興趣的朋友可以了解下2020-09-09使用Python實現(xiàn)在Excel工作表中創(chuàng)建、修改及刪除表格區(qū)域
在數(shù)據(jù)分析和自動化處理的工作中,Excel作為一種強大的工具被廣泛應(yīng)用,而通過Python來操作Excel工作表中的表格,可以極大提高工作效率,下面我們就來學(xué)學(xué)如何使用Python在Excel工作表中創(chuàng)建,修改及刪除表格吧2024-12-12