Docker: сервер неожиданно закрыл соединение

Вопрос или проблема

У меня есть celery worker в docker, когда выполняется задача, которая занимает 10 минут, он не сможет отправить результаты, так как потеряет соединение с сервером rabbitmq. Если задача выполняется без docker, все будет работать нормально. Rabbitmq также находится в docker, и они оба на отдельных серверах. Логи rabbitmq говорят пропущенные сердцебиения от клиента, тайм-аут: 60с. Если задача просто time.sleep, она будет работать нормально, но если это что-то вроде следующего:

from celery_project import app
import time
import logging

@app.task(queue="test_load_balancer_q", acks_late=True)
def test_task():
    start_time = time.time()
    duration = 600  # 10 минут в секундах

    # Симулируем задачу, выполняя непрерывную работу, пока не пройдет 10 минут
    while (time.time() - start_time) < duration:
        # Выполняем какую-то нетривиальную работу (например, вычисления)
        _ = sum(i**2 for i in range(10000))

    logging.info("Задача завершена через 10 минут")

это потерпит неудачу, есть идеи?

Ответ или решение

Проблема, с которой вы столкнулись, связана с временем выполнения задачи в вашей среде Celery, запущенной в Docker. Сообщение об ошибке "missed heartbeats from client, timeout: 60s" от RabbitMQ указывает на то, что у RabbitMQ не поступает сигнал о том, что Celery worker (рабочий процесс) активен. Это происходит из-за того, что ваша задача выполняется продолжительное время (10 минут) без каких-либо операций ввода-вывода, что приводит к игнорированию сердечных биений (heartbeats).

Что такое сердечные биения (Heartbeats)?

Сердечные биения — это механизм, используемый RabbitMQ для контроля состояния соединений с клиентами. Он предполагает, что клиент отправляет сигналы (heartbeats) определённого интервала времени, чтобы подтвердить, что он всё ещё активен и работает. Если RabbitMQ не получает такие сигналы в течение установленного времени (в данном случае 60 секунд), он считает соединение закрытым или неактивным.

Почему ваша задача теряет соединение?

Ваш код задачи выполняет вычисления без какого-либо задерживания или операции ввода-вывода в цикле, поэтому RabbitMQ не получает сигналы о том, что рабочий процесс по-прежнему активен.

Когда вы используете time.sleep(), ваш код "приостанавливает" выполнение, и RabbitMQ, скорее всего, продолжает получать соответствующие сердечные биения, поэтому соединение не теряется.

Как устранить проблему?

Вот несколько способов решения вашей проблемы:

1. Увеличение интервала сердечных биений

Вы можете увеличить значение heartbeat для RabbitMQ через конфигурацию Celery. Это позволит увеличить время до таймаута. Однако будьте осторожны с этой настройкой, так как слишком большое значение может скрыть реальные проблемы с подключением.

from celery import Celery

app = Celery('your_app',
             broker='your_broker_url',
             broker_heartbeat=120)  # Увеличьте до 120 секунд или больше

2. Обновление сердечных биений в процессе работы

Вы можете использовать метод update_task() для отправки сердечных биений в процессе выполнения задачи. Например, добавьте self.update_state() в цикле выполнения длительной задачи, чтобы RabbitMQ знал, что ваш рабочий процесс всё ещё активен. Пример кода:

from celery_project import app
import time
import logging

@app.task(queue="test_load_balancer_q", acks_late=True)
def test_task():
    start_time = time.time()
    duration = 600  # 10 минут в секундах

    while (time.time() - start_time) < duration:
        _ = sum(i**2 for i in range(10000))
        # Обновляем состояние, чтобы отправить сердечное биение
        test_task.update_state(state='PROGRESS')

    logging.info("Задача завершена через 10 минут")

3. Настройка RabbitMQ

Также стоит проверить настройки RabbitMQ, чтобы убедиться, что таймаут соединения и другие параметры установлены правильно. Например, можно настроить параметры heartbeat в конфигурационном файле RabbitMQ.

4. Использование дополнительных Celery опций

Celery предоставляет возможность устанавливать параметры по умолчанию в конфигурации. Вы можете настроить параметры task_soft_time_limit и task_time_limit, чтобы задать временные пределы для выполнения задач.

5. Перенос длительных операций вне задачи

Если возможно, рассмотрите возможность переноса длительных операций в асинхронные задачи или делите ваши операции на более мелкие задачи, чтобы избежать проблемы с потерей соединения.

Заключение

Проблема, с которой вы столкнулись, связана с тем, что Celery worker не отправляет сердечные биения в течение длительного времени. Рассмотренные выше решения помогут вам устранить проблему с потерей соединения с RabbitMQ. Убедитесь, что периодически отправляете сигналы о своей активности, чтобы поддерживать соединение в рабочем состоянии.

Оцените материал
Добавить комментарий

Капча загружается...