旁拮猾 发表于 2025-5-31 23:44:25

Python 基于队列实现 tcp socket 连接池

连接池实现

socket_pool.py
# -*- coding:utf-8 -*-
import socket
import time
import threading
import os
import logging
import traceback
from queue import Queue, Empty

_logger = logging.getLogger('mylogger')

class SocketPool:
    def __init__(self, host, port, min_connections=10, max_connections=10):
      '''
      初始化Socket连接池
      :param host: 目标主机地址
      :param port: 目标端口号
      :param min_connections: 最小连接数
      :param max_connections: 最大连接数
      '''
      self.host = host
      self.port = port
      self.min_connections = min_connections
      self.max_connections = max_connections
      self.busy_sockets_dict = {} # 存放从连接池取出的socket的id
      self._sock_lock = threading.Lock()# 线程锁保证计数正确
      self._pool = Queue(max_connections)# 基于线程安全的队列存储连接
      self._lock = threading.Lock()      # 线程锁保证资源安全:
      self._init_pool()                  # 预创建连接
      self._start_health_check()         # 启动连接健康检查线程

    def _init_pool(self):
      '''预创建连接并填充到池中'''
      
      for _ in range(self.min_connections):
            sock = self._create_socket()
            self._pool.put(sock)

    def _create_socket(self):
      '''创建新的Socket连接'''
      
      sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      try:
            sock.connect((self.host, self.port))
            return sock
      except socket.error as e:
            raise ConnectionError(f'Failed to connect: {e}')# 连接失败抛出异常

    def _start_health_check(self):
      '''启动后台线程定期检查连接有效性'''
      
      def check():
            while True:
                with self._lock:
                  for _ in range(self._pool.qsize()):
                        sock = self._pool.get()
                        self.busy_sockets_dict = 1
                        try:
                            sock.send(b'PING<END>')# 发送心跳包验证连接状态
                            # 以下 11 为服务端返回数据字节长度,不能乱写,否则会导致获取非健康检查响应报文数据存在多余内容,不符合格式,从而导致数据解析问题
                            sock.recv(11)
                            self._pool.put(sock)
                            self.busy_sockets_dict.pop(sock)
                        except (socket.error, ConnectionResetError):
                            _logger.error('socket连接健康检查出错:%s, 关闭失效连接并创建新连接替换' % traceback.format_exc())
                            sock.close()# 关闭失效连接并创建新连接替换
                            self.busy_sockets_dict.pop(sock)

                            new_sock = self._create_socket()
                            self._pool.put(new_sock)
                  
                  # 如果sock数量小于最小数量,则补充
                  for _ in range(0, self.min_connections - self._pool.qsize()):
                        new_sock = self._create_socket()
                        self._pool.put(new_sock)
                time.sleep(60)# 每60秒检查一次
      threading.Thread(target=check, daemon=True).start()

    def get_connection(self):
      '''
      从池中获取一个可用连接
      :return: socket对象
      '''
      
      with self._sock_lock:
            if self._pool.empty():
                if len(self.busy_sockets_dict.keys()) < self.max_connections:
                  new_sock = self._create_socket()
                  self.busy_sockets_dict = 1
                  return new_sock
                else:
                  raise Empty('No available connections in pool')
            else:
                try:
                  sock = self._pool.get(block=False)
                  self.busy_sockets_dict = 1
                  return sock
                except Exception:
                  _logger.error('获取socket连接出错:%s' % traceback.format_exc())
                  raise
               

    def release_connection(self, sock):
      '''
      将连接归还到池中
      :param sock: 待归还的socket对象
      '''
      if not sock._closed:
            self._pool.put(sock)
      if sock in self.busy_sockets_dict:
            self.busy_sockets_dict.pop(sock)


    def close_all(self):
      '''关闭池中所有连接'''
      
      while not self._pool.empty():
            sock = self._pool.get()
            sock.close()
            self.busy_sockets_dict.pop(sock.id)
      self.busy_sockets_dict = {} # 兜底

host = os.environ.get('MODBUS_TCP_SERVER_HOST', '127.0.0.1')
port = int(os.environ.get('MODBUS_TCP_SERVER_PORT', '9000'))
min_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '10'))
max_connections = int(os.environ.get('DJANGO_SOCKET_POOL_MAX_CONNECTIONS', '100'))
socketPool = SocketPool(host, port, min_connections, max_connections)使用连接池

from socket_pool import socketPool

def send_socket_msg(data):
    global socketPool
   
    try:
      sock = None
      # 获取连接(支持超时控制)
      sock = socketPool.get_connection()
      # 发送数据
      sock.sendall(data.encode('utf-8'))
    except Exception:
      error_msg = '发送消息出错:%s' % traceback.format_exc()
      _logger.error(error_msg)
      
      if sock is not None:
            sock.close()
            socketPool.release_connection(sock)
      return send_socket_msg(data)
   
    response = ''
    try:
      while True:
            chunk = sock.recv(4096)
            chunk = chunk.decode('utf-8')
            response += chunk
            if response.endswith('<END>'):
                response = response.rstrip('<END>')
                return {'success':True, 'message':response}
    except Exception:
      error_msg = '获取消息出错:%s' % traceback.format_exc()
      _logger.error(error_msg)
      return {'success':False, 'message': error_msg}
    finally:
      # 必须归还连接!
      socketPool.release_connection(sock)
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Python 基于队列实现 tcp socket 连接池