Настройка пула TCP-соединений с правильной конфигурацией повторного подключения для соединений, повторно подключающихся на том же исходном порту, что и прежде.

Вопрос или проблема

Итак, я пытаюсь настроить пул соединений, чтобы протестировать свое приложение с гораздо большим количеством запросов, не сталкиваясь с ограничениями файловых дескрипторов и другими лимитами на моем компьютере (M1 Macbook air).

Теперь реализованный мной пул соединений закрывает соединение, когда оно возвращается в пул после использования для запроса, потому что эта реализация пула соединений используется тестовым клиентом, который взаимодействует с сервером, а сервер по умолчанию закрывает соединение на своей стороне после каждого запроса, как и должно быть. Это обсуждаемо, но в моем приложении серверу не нужно удерживать соединение открытым дольше, чем это необходимо. Поэтому мы также закрываем соединение, когда оно возвращается, что должно освободить файловый дескриптор, а затем мы создаем новое соединение и связываем его с тем же исходным портом, что и предыдущее.

Вот код моего текущего класса пула соединений, комментарии обсудим позже.

import socket
from queue import Queue, Empty
import threading
import time
import struct
class ConnectionPool:
    def __init__(self, ip, port, pool_size=10,source_port_start=30000,source_port_end=40000):
        self.ip = ip
        self.port = port
        self.pool_size = pool_size
        self.source_port_start = source_port_start
        self.next_source_port = source_port_start
        self.source_port_end = source_port_end
        self.pool = Queue(maxsize=pool_size)
        self.lock = threading.Lock() 
        self._create_pool()
        

    def _create_pool(self):
        for _ in range(self.pool_size):
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # Попытка привязаться к уникальному исходному порту в пределах диапазона
            with self.lock:
                for port in range(self.next_source_port, self.source_port_end):
                    try:
                        conn.bind(('', port))
                        # print(f"Привязано к исходному порту: {port}")
                        self.next_source_port = port + 1
                        break
                    except OSError as e:
                        print(f"Порт {port} занят, пробуем следующий порт.")
                        continue
                else:
                    print("Нет доступных портов в диапазоне.")
                    conn.close()
                    return None  # Нет доступного порта для привязки
            
            try:
                conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                # conn.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 10)
                # Установка параметров TCP keep-alive
                # Это может потребовать настройки, специфичной для платформы
                # Установка интервалов keep-alive

                # Время простоя перед отправкой первого keepalive-запроса (в секундах)

                # Интервал между keepalive-запросами (в секундах)
                # conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 10)

                # Максимальное количество keepalive-запросов перед объявлением соединения мертвым
                # conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 10)
                conn.connect((self.ip, self.port))
                self.pool.put((conn, port))
            except socket.error as e:
                print(f"Соединение не удалось: {e}")
                conn.close()
    
    def get_connection(self):
        try:
            return self.pool.get_nowait()
        except Empty:
            return None

    def return_connection(self, conn_tuple):
        # Принудительное закрытие сокета
        conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
        conn_tuple[0].close()
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # conn.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 10) проверить, если исходный порт
        conn.bind(('', conn_tuple[1]))
        conn.connect((self.ip, self.port))
        # if not self.pool.full():
        #     self.pool.put((conn,conn_tuple[1]))
        self.pool.put((conn,conn_tuple[1]))

    def close_all(self):
        while not self.pool.empty():
            conn_tuple = self.pool.get_nowait()
            conn_tuple[0].close()

Как видите, метод возврата соединения закрывает соединение и снова открывает соединение, связывая его с тем же исходным портом, что и раньше.

Проблема в том, что когда я использую эту реализацию, она работает до 2000 запросов, сделанных одновременно, но когда я увеличиваю количество запросов до 4000 и выше, она зависает в какой-то момент, и мне приходится выходить по сигналу SIGINT.

Я больше склонен думать, что это связано с тем, как мы закрываем и открываем соединение, возможно, требуется какая-то дополнительная конфигурация с сокетами, о которой я не знаю. Каждый раз, когда я выхожу с сигналом SIGINT, журнал ошибок показывает, что что-то было не так с вызовом conn.recv, и я не думаю, что проблема в моей логике тестирования, я думаю, что что-то не так с самим соединением, которое явно было создано либо в методе создания соединения, либо в методе возврата соединения.

Теперь о том, что я пробовал до сих пор, сначала
conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
без этой опции он даже не мог обработать до 200 одновременных запросов, что, на мой взгляд, можно объяснить тем, что соединение, хотя и было вызвано закрытие, все еще может занять несколько дополнительных циклов процессора, чтобы на самом деле закрыться из-за состояния TIME_WAIT, чтобы предотвратить доставку задержанных пакетов в новое соединение, и если другое откроется в это время, эта опция позволит нам привязаться к порту, который все еще находится в состоянии TIME_WAIT.

Как видите, была сделана попытка сделать соединения долгоживущими с помощью keep alive и запросов, но это провалилось, поскольку тесты начали зависать намного раньше. Я не знаю, как это объяснить, но, думаю, так даже лучше, так как мы хотим закрыть соединение сразу после использования и переподключиться с новым соединением, а создание этих соединений долгоживущими было бы противоречием с тем, что мы пытаемся сделать, хотя я могу ошибаться.

И самое важное, и изменение, которое сделало первый тест на 2000 одновременных запросов возможным, было использование
conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
этой опции перед фактическим вызовом close. Это заставляет сокет закрываться немедленно, отбрасывая все неотправленные данные. Без этого тесты чаще зависали в середине и не могли протестировать до 100 одновременных запросов должным образом при размере пула 10.

После этих изменений тест смог пройти 2000 одновременных запросов с размером пула в 10 успешно.

Также пробовал

 if conn_tuple[0].fileno() != -1:
        try:
            # Попытка установить SO_LINGER для корректного закрытия сокета
            conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('hh', 1, 0))
        except OSError as e:
            print(f"Предупреждение: SO_LINGER не удался с ошибкой {e}")

Здесь я пытался выяснить, были ли какие-то ошибки при установке этой опции, и иногда они были, и в журнале сообщения говорилось “Предупреждение: SO_LINGER не удался с ошибкой: недопустимый аргумент”, не смог разобраться, что это было.

Но это выходит из строя при 4000 и выше, так как мы можем исправить и улучшить это, чтобы theoretически использовать пул даже до миллиона.

Ответ или решение

Чтобы настроить пул соединений с учетом повторного подключения по тому же исходному порту, необходимо учесть несколько аспектов TCP-соединений и оптимизации многопоточности. Давайте подробно рассмотрим вашу реализацию и предложим улучшения.

1. Структура класса ConnectionPool

Ваш класс ConnectionPool уже имеет хорошую основу, однако стоит обратить внимание на некоторые практические аспекты, которые могут улучшить его производительность и стабильность при большом количестве параллельных запросов.

2. Обработка передаваемых сокетов

Когда вы работаете с пулами соединений и многопоточностью, важно убедиться, что сокеты правильно закрываются и повторно инициализируются. Лучше всего это делать через контекстный менеджер для обработки исключений, что позволит избежать утечек ресурсов. В методе return_connection у вас нет явного управления исключениями.

def return_connection(self, conn_tuple):
    try:
        # Закрываем сокет, используя SO_LINGER
        conn_tuple[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', 1, 0))
        conn_tuple[0].close()

        # Создаем новый сокет
        conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        conn.bind(('', conn_tuple[1]))
        conn.connect((self.ip, self.port))
        self.pool.put((conn, conn_tuple[1]))

    except Exception as e:
        print(f"Ошибка при возврате соединения: {e}")

3. Оптимизация обратной связи и повторных подключений

Используйте отложенные повторные подключения с экспоненциальным увеличением времени ожидания. Это снизит риск перегрузки сервера:

def create_connection(self, port):
    retries = 5
    for i in range(retries):
        try:
            conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            conn.bind(('', port))
            conn.connect((self.ip, self.port))
            return conn
        except socket.error as e:
            print(f"Ошибка соединения, попытка {i+1}/{retries}: {e}")
            time.sleep(2 ** i)  # Экспоненциальная задержка
    return None

4. Динамическое управление пулом соединений

Для стабильной работы при большом количестве запросов стоит рассмотреть динамическое управление размером пула с учетом текущего количества активных соединений. Например:

def get_connection(self):
    while True:
        try:
            return self.pool.get_nowait()
        except Empty:
            if self.pool.qsize() < self.pool_size:
                # Увеличиваем пул динамически
                self._create_pool()  
            time.sleep(0.1)  # Пауза перед повторной попыткой

5. Минимизация использования TIME_WAIT

Использование флага SO_REUSEADDR позволяет сокету быть повторно привязанным к порту, который находится в состоянии TIME_WAIT. Однако при использовании такого подхода будьте осторожны, так как это может вызвать проблемы с неустойчивыми соединениями. Сделайте проверки состояния соединений перед их повторным использованием.

Заключение

Эти изменения помогут улучшить производительность вашего пула соединений при увеличении нагрузки. Важно регулярно тестировать программу под различными нагрузками, чтобы выявить узкие места, а также следить за состоянием сокетов и отладочными сообщениями.

Не забудьте также использовать инструменты мониторинга, такие как netstat и lsof, чтобы отслеживать состояние соединений и выявлять проблемы с ресурсами на вашем сервере. Улучшение производительности вашего кода доступно, и постоянное тестирование и оптимизация только усилят его эффективность.

Оцените материал
Добавить комментарий

Капча загружается...