欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Python 基于隊列實現(xiàn) tcp socket 連接池的方法

 更新時間:2025年05月05日 11:43:45   作者:授客  
這篇文章主要介紹了Python 基于隊列實現(xiàn) tcp socket 連接池的方法,本文通過實例代碼給大家介紹的非常詳細,感興趣的朋友一起看看吧

連接池實現(xiàn)

socket_pool.py

# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import Queue, Empty
_logger = logging.getLogger('mylogger')
class SocketPool:
    def __init__(self, host, port, min_connections=10, max_connections=10):
        '''
        初始化Socket連接池
        :param host: 目標主機地址
        :param port: 目標端口號
        :param min_connections: 最小連接數(shù)
        :param max_connections: 最大連接數(shù)
        '''
        self.host = host
        self.port = port
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.busy_sockets_dict = {} # 存放從連接池取出的socket的id
        self._sock_lock = threading.Lock()  # 線程鎖保證計數(shù)正確
        self._pool = Queue(max_connections)  # 基于線程安全的隊列存儲連接
        self._lock = threading.Lock()        # 線程鎖保證資源安全:
        self._init_pool()                    # 預(yù)創(chuàng)建連接
        self._start_health_check()           # 啟動連接健康檢查線程
    def _init_pool(self):
        '''預(yù)創(chuàng)建連接并填充到池中'''
        for _ in range(self.min_connections):
            sock = self._create_socket()
            self._pool.put(sock)
    def _create_socket(self):
        '''創(chuàng)建新的Socket連接'''
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.connect((self.host, self.port))
            return sock
        except socket.error as e:
            raise ConnectionError(f'Failed to connect: {e}')  # 連接失敗拋出異常
    def _start_health_check(self):
        '''啟動后臺線程定期檢查連接有效性'''
        def check():
            while True:
                with self._lock:
                    for _ in range(self._pool.qsize()):
                        sock = self._pool.get()
                        self.busy_sockets_dict[sock] = 1
                        try:
                            sock.send(b'PING<END>')  # 發(fā)送心跳包驗證連接狀態(tài)
                            # 以下 11 為服務(wù)端返回數(shù)據(jù)字節(jié)長度,不能亂寫,否則會導(dǎo)致獲取非健康檢查響應(yīng)報文數(shù)據(jù)存在多余內(nèi)容,不符合格式,從而導(dǎo)致數(shù)據(jù)解析問題
                            sock.recv(11)
                            self._pool.put(sock)
                            self.busy_sockets_dict.pop(sock)
                        except (socket.error, ConnectionResetError):
                            _logger.error('socket連接健康檢查出錯:%s, 關(guān)閉失效連接并創(chuàng)建新連接替換' % traceback.format_exc())
                            sock.close()  # 關(guān)閉失效連接并創(chuàng)建新連接替換
                            self.busy_sockets_dict.pop(sock)
                            new_sock = self._create_socket()
                            self._pool.put(new_sock)
                    # 如果sock數(shù)量小于最小數(shù)量,則補充
                    for _ in range(0, self.min_connections - self._pool.qsize()):
                        new_sock = self._create_socket()
                        self._pool.put(new_sock)
                time.sleep(60)  # 每60秒檢查一次
        threading.Thread(target=check, daemon=True).start()
    def get_connection(self):
        '''
        從池中獲取一個可用連接
        :return: socket對象
        '''
        with self._sock_lock:
            if self._pool.empty():
                if len(self.busy_sockets_dict.keys()) < self.max_connections:
                    new_sock = self._create_socket()
                    self.busy_sockets_dict[new_sock] = 1
                    return new_sock
                else:
                    raise Empty('No available connections in pool')
            else:
                try:
                    sock = self._pool.get(block=False)
                    self.busy_sockets_dict[sock] = 1
                    return sock
                except Exception:
                    _logger.error('獲取socket連接出錯:%s' % traceback.format_exc())
                    raise
    def release_connection(self, sock):
        '''
        將連接歸還到池中
        :param sock: 待歸還的socket對象
        '''
        if not sock._closed:
            self._pool.put(sock)
        if sock in self.busy_sockets_dict:
            self.busy_sockets_dict.pop(sock)
    def close_all(self):
        '''關(guān)閉池中所有連接'''
        while not self._pool.empty():
            sock = self._pool.get()
            sock.close()
            self.busy_sockets_dict.pop(sock.id)
        self.busy_sockets_dict = {} # 兜底
host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
socketPool = SocketPool(host, port, min_connections, max_connections)

使用連接池

from socket_pool import socketPool
def send_socket_msg(data):
    global socketPool
    try:
        sock = None
        # 獲取連接(支持超時控制)
        sock = socketPool.get_connection()
        # 發(fā)送數(shù)據(jù)
        sock.sendall(data.encode('utf-8'))
    except Exception:
        error_msg = '發(fā)送消息出錯:%s' % traceback.format_exc()
        _logger.error(error_msg)
        if sock is not None:
            sock.close()
            socketPool.release_connection(sock)
        return send_socket_msg(data)
    response = ''
    try:
        while True:
            chunk = sock.recv(4096)
            chunk = chunk.decode('utf-8')
            response += chunk
            if response.endswith('<END>'):
                response = response.rstrip('<END>')
                return {'success':True, 'message':response}
    except Exception:
        error_msg = '獲取消息出錯:%s' % traceback.format_exc()
        _logger.error(error_msg)
        return {'success':False, 'message': error_msg}
    finally:
        # 必須歸還連接!
        socketPool.release_connection(sock)

到此這篇關(guān)于Python 基于隊列實現(xiàn) tcp socket 連接池的文章就介紹到這了,更多相關(guān)Python tcp socket 連接池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論