В моем проекте на Django возникла проблема с Celery и Redis в качестве брокера сообщений.
Текущие версии:
celery==5.3.6
redis==5.0.8
Проблема описывается следующим логом:
2024-09-15 14:31:05 -------------- celery@5d4164b0472c v5.3.6 (emerald-rush)
2024-09-15 14:31:05 --- ***** -----
2024-09-15 14:31:05 -- ******* ---- Linux-6.10.0-linuxkit-aarch64-with-glibc2.31 2024-09-15 12:31:05
2024-09-15 14:31:05 - *** --- * ---
2024-09-15 14:31:05 - ** ---------- [config]
2024-09-15 14:31:05 - ** ---------- .> app: core:0xffff8a65b550
2024-09-15 14:31:05 - ** ---------- .> transport: redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000//
2024-09-15 14:31:05 - ** ---------- .> results: redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000/
2024-09-15 14:31:05 - *** --- * --- .> concurrency: 11 (solo)
2024-09-15 14:31:05 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
2024-09-15 14:31:05 --- ***** -----
2024-09-15 14:31:05 -------------- [очереди]
2024-09-15 14:31:05 .> celery exchange=celery(direct) key=celery
2024-09-15 14:31:05 .> celery:1 exchange=celery:1(direct) key=celery:1
2024-09-15 14:31:05 .> celery:2 exchange=celery:2(direct) key=celery:2
2024-09-15 14:31:05 .> celery:3 exchange=celery:3(direct) key=celery:3
2024-09-15 14:31:05
2024-09-15 14:31:05 [задачи]
2024-09-15 14:31:05 . apps.data_integration.tasks.celery_dataddo_process
2024-09-15 14:31:05 . apps.incrementality_experiment.tasks.geolift_market_match_task
2024-09-15 14:31:05
2024-09-15 14:31:05 [2024-09-15 12:31:05,926: INFO/MainProcess] Подключено к redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000//
2024-09-15 14:31:06 [2024-09-15 12:31:06,739: INFO/MainProcess] celery@5d4164b0472c готов.
2024-09-15 14:38:16 [2024-09-15 12:38:16,078: INFO/MainProcess] синхронизация с celery@03ffef68e38c
2024-09-15 14:38:16 [2024-09-15 12:38:16,621: ERROR/MainProcess] Ошибка команды управления: ValueError('недостаточно значений для распаковки (ожидалось 3, получено 1)')
2024-09-15 14:38:16 Traceback (последний вызов был последний):
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/celery/worker/pidbox.py", строка 44, в on_message
2024-09-15 14:38:16 self.node.handle_message(body, message)
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", строка 143, в handle_message
2024-09-15 14:38:16 return self.dispatch(**body)
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", строка 110, в dispatch
2024-09-15 14:38:16 self.reply({self.hostname: reply},
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", строка 147, в reply
2024-09-15 14:38:16 self.mailbox._publish_reply(data, exchange, routing_key, ticket,
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", строка 277, в _publish_reply
2024-09-15 14:38:16 producer.publish(
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/messaging.py", строка 186, в publish
2024-09-15 14:38:16 return _publish(
2024-09-15 14:38:16 ^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/connection.py", строка 556, в _ensured
2024-09-15 14:38:16 return fun(*args, **kwargs)
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/messaging.py", строка 208, в _publish
2024-09-15 14:38:16 return channel.basic_publish(
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", строка 610, в basic_publish
2024-09-15 14:38:16 return self.typeof(exchange).deliver(
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", строка 74, в deliver
2024-09-15 14:38:16 for queue in _lookup(exchange, routing_key):
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", строка 721, в _lookup
2024-09-15 14:38:16 R = self.typeof(exchange).lookup(
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", строка 66, в lookup
2024-09-15 14:38:16 return {
2024-09-15 14:38:16 ^
2024-09-15 14:38:16 Файл "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", строка 67, в <setcomp>
2024-09-15 14:38:16 queue for rkey, _, queue in table
2024-09-15 14:38:16 ^^^^^^^^^^^^^^
2024-09-15 14:38:16 ValueError: недостаточно значений для распаковки (ожидалось 3, получено 1)
Это происходит как в Docker, так и в системе AWS, когда я развертываю. Сразу после развертывания или компоновки контейнера, если я даю системе задачи для выполнения, она обрабатывает их идеально, но если я оставляю ее неиспользованной на несколько минут, она ломается, как указано в логе.
В настоящее время я использую следующую конфигурацию в docker-compose:
celery:
<<: *app
command: celery -A core worker --without-mingle --without-gossip --without-heartbeat --pool=solo -l info -Q celery,celery:1,celery:2,celery:3
ports:
- 5554:5554
depends_on:
- db
- redis
и эту конфигурацию в celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Установите значение по умолчанию для настройки Django для программы 'celery'.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
# Инициализация приложения Celery.
app = Celery('core')
# Загрузка настроек, связанных с задачами, из настроек проекта Django с использованием пространства имен 'CELERY'.
app.config_from_object('django.conf:settings', namespace="CELERY")
# Обеспечить повторные попытки подключения брокера при запуске для Celery 6.0 и выше.
app.conf.broker_connection_retry_on_startup = True
# Опции транспортировки брокера для приоритета сообщений и стратегии маршрутизации задач.
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'sep': ':',
'queue_order_strategy': 'priority',
}
# Ограничить предварительную выборку задач до одной задачи за раз для балансировки нагрузки на рабочего.
app.conf.worker_prefetch_multiplier = 1
app.conf.task_soft_time_limit = 1200 # 20 минут
app.conf.task_time_limit = 1500 # 25 минут, жесткий лимит
app.conf.broker_heartbeat = 30 # Увеличить интервал проверки сердцебиения Redis
app.conf.broker_connection_timeout = 10 # Время в секундах для ожидания соединения
app.conf.broker_connection_retry_on_startup = True # Обеспечить повторные попытки
app.conf.task_acks_late = True # Подтверждать задачу только после завершения
app.conf.task_reject_on_worker_lost = True # Отругается задача, если работник потерян
app.conf.task_default_retry_delay = 300 # Повторная попытка через 5 минут
app.conf.task_max_retries = 5 # Макс. повторов для провалившихся задач
# Автоматически обнаруживать задачи из установленных приложений (tasks.py).
app.autodiscover_tasks()
Ответ
Проблема, с которой вы столкнулись, связана с тем, что Celery-воркер перестает обрабатывать задачи после некоторого времени бездействия, что отображается в логах с ошибкой ValueError('not enough values to unpack (expected 3, got 1)')
. Это может быть вызвано несколькими факторами, включая неправильную конфигурацию, ошибки в связи с Redis или внутренние проблемы в самом Celery. Вот несколько шагов для решения этой проблемы.
1. Обновление версий библиотек
Убедитесь, что вы используете последние стабильные версии Celery и Redis. На момент написания этого ответа, существуют более свежие версии, которые могут содержать важные исправления для известных ошибок. Проверьте официальный репозиторий Celery и редис на pypi.org.
2. Настройка Heartbeat
Вы уже настроили app.conf.broker_heartbeat = 30
, что является хорошим шагом, но стоит также убедиться, что ваш Redis брокер правильно настроен для обработки heartbeat. Убедитесь, что ваша версия Redis поддерживает heartbeat.
3. Использование --without-gossip
и --without-mingle
Ваша команда запуска Celery содержит параметры --without-mingle
и --without-gossip
, что отключает функции синхронизации между воркерами. Это может быть уместно в некоторых сценариях, но может привести к потере некоторых функций. Если ваши задачи не требуют межсимвольной синхронизации, это может быть приемлемым.
4. Проверьте настройки пула
Вы используете --pool=solo
, что значительно ограничивает параллельную обработку задач. Для улучшения производительности и стабильности, рассмотрите возможность использования пула prefork
, который лучше управляет задачами. Измените команду запуска на --pool=prefork
.
5. Обработка ошибок и логирование задач
Попробуйте улучшить логирование ваших задач, чтобы иметь возможность более точно определять, где происходят сбои. Включите task events
в вашей конфигурации Celery, добавив -E
в команду запуска:
command: celery -A core worker -E --without-mingle --without-gossip --pool=prefork -l info -Q celery,celery:1,celery:2,celery:3
Это позволит вам отслеживать события задач и, возможно, увидеть дополнительные ошибки.
6. Увеличение времени ожидания соединения
Вы установили broker_connection_timeout = 10
, но возможно, для вашего окружения требуется больше времени на установление соединения. Попробуйте увеличить это значение до 30 секунд или даже больше.
7. Настройка временных лимитов задач
Ваши настройки task_soft_time_limit
и task_time_limit
подходят, но стоит убедиться, что задачи действительно завершаются в установленное время. Если ваши задачи могут выполняться дольше, чем лимиты, подумайте о настройке их в соответствии с реальными требованиями.
Пример обновленной команды запуска
Исходя из вышеперечисленного, пример команды запуска может выглядеть следующим образом:
celery:
<<: *app
command: celery -A core worker -E --without-mingle --without-gossip --pool=prefork -l info -Q celery,celery:1,celery:2,celery:3
ports:
- 5554:5554
depends_on:
- db
- redis
Заключение
Если после выполнения всех этих шагов проблема все еще существует, возможно, стоит рассмотреть более глубокую диагностику системы. Логи Redis и вывод команды docker logs
для ваших контейнеров могут предоставить дополнительную информацию о том, что может идти не так. Также, если есть возможность, попробуйте развернуть Celery в другом окружении (например, локально), чтобы проверить, повторяется ли проблема.