- Вопрос или проблема
- Tasks.py
- Ответ или решение
- 1. Подтверждение корректности настройки Celery
- Проверьте настройки в settings.py
- 2. Проверка работоспособности Celery Workers и Beat
- 3. Проверка выполнения задач
- 4. Настройка PeriodicTask
- 5. Логирование и отладка
- 6. Проверка базы данных Django
- 7. Проверка интервалов и частоты запуска
- Заключение
Вопрос или проблема
Когда beat и worker запущены, периодическая задача создается в Django Celery Beat, но она не выполняется, когда интервал времени истекает. Эта задача не выполняется:
‘task’: ‘stripeapp.tasks.create_google_sheet_with_payments_task’,
Прокомментировано 2 часа назад
Tasks.py
from celery import shared_task
from django_celery_beat.models import PeriodicTask, IntervalSchedule
import json
from .models import TaskSchedule
from datetime import timedelta
from .stripe_to_sheet_saving import create_google_sheet_with_payments
from django.utils import timezone
@shared_task
def schedule_automated_tasks():
print("Запланирование автоматических задач.")
# Логирование всех существующих периодических задач для отладки
existing_tasks = PeriodicTask.objects.all()
for task in existing_tasks:
print(task.name, task.task, task.interval, task.expires)
# Получаем задачи из модели TaskSchedule, где статус 'running'
running_tasks = TaskSchedule.objects.filter(status="running")
for task_schedule in running_tasks:
print(f"Найдена задача расписания: {task_schedule}")
task = task_schedule.task
frequency = task.stripe_source.run_frequency if task.stripe_source else None
occurrence = task.stripe_source.occurence if task.stripe_source else None
end_date = task_schedule.end_date
# Определите интервал на основе частоты
interval = get_interval_from_frequency(frequency)
if interval is None:
print(f"Неверная частота: {frequency} для задачи {task.id}. Пропускаем.")
continue # Неверная частота, пропускаем
# Проверяем, должна ли задача быть отмечена как завершенная
running_count = TaskSchedule.objects.filter(task=task, status="running").count()
if occurrence is not None and end_date is None and running_count >= occurrence:
task_schedule.status="complete" # Обновляем статус на завершенный
task_schedule.save()
print(f"Задача {task.id} помечена как завершенная из-за предела вхождения.")
# Запланируйте задачу с помощью Celery Beat
periodic_task, created = PeriodicTask.objects.get_or_create(
name=f"Task-{task.id}",
defaults={
'task': 'stripeapp.tasks.create_google_sheet_with_payments_task',
'interval': IntervalSchedule.objects.get_or_create(every=interval, period=IntervalSchedule.SECONDS)[0],
'args': json.dumps([
task.data_mapper_user.email,
task.google_sheet_destination.sheet_name if task.google_sheet_destination else None,
task.stripe_source.payment_options,
task.stripe_source.previous_data_option,
task.id
]),
'expires': end_date, # Установить срок действия
}
)
if created:
print(f"Создана новая периодическая задача: {periodic_task.name}")
else:
print(f"Обновлена существующая периодическая задача: {periodic_task.name}")
# Обновить аргументы задачи
periodic_task.args = json.dumps([
task.data_mapper_user.email,
task.google_sheet_destination.sheet_name if task.google_sheet_destination else None,
task.stripe_source.payment_options,
task.stripe_source.previous_data_option,
task.id
])
periodic_task.save()
# Обновить next_run и previous_run
task.next_run = timezone.now() + timedelta(seconds=interval) # Убедитесь, что это с учетом часового пояса
task.previous_run = timezone.now() # Убедитесь, что это с учетом часового пояса
task.save()
print(f"Обновлено следующий запуск для задачи {task.id} на {task.next_run}.")
# Логируйте, сколько времени осталось до следующего запуска
remaining_time = task.next_run - timezone.now()
print(f"Осталось времени до следующего выполнения задачи {task.id}: {remaining_time.total_seconds()} секунд.")
def get_interval_from_frequency(frequency):
"""Возвращает интервал в секундах на основе строки частоты."""
frequency_map = {
'daily': 60, # секунд в день
'hourly': 3600, # секунд в час
'weekly': 604800, # секунд в неделю
'monthly': 2592000, # секунд в месяц
'quarterly': 7776000, # секунд в квартал
'semi_annually': 15552000, # секунд в шесть месяцев
'annually': 31536000 # секунд в год
}
return frequency_map.get(frequency)
@shared_task
def create_google_sheet_with_payments_task(email, google_sheet_name, payment_option, frequency_filter, custom_start_date=None, custom_end_date=None, task_id=None):
print(f"Запуск задачи для email: {email}, имя листа: {google_sheet_name}.")
return create_google_sheet_with_payments(
email=email,
google_sheet_name=google_sheet_name,
payment_option=payment_option,
frequency_filter=frequency_filter,
custom_start_date=custom_start_date,
custom_end_date=custom_end_date,
task_id=task_id
)
settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'schedule_automated_tasks': {
'task': 'stripeapp.tasks.schedule_automated_tasks',
'schedule': 60.0, # выполняется каждую минуту
},
}
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'datamapper.settings')
app = Celery('datamapper')
app.config_from_object('django.conf:settings', namespace="CELERY")
app.autodiscover_tasks()
вывод
[2024-10-19 16:21:22,128: INFO/MainProcess] Задача stripeapp.tasks.schedule_automated_tasks[199e547d-a0f2-46da-a89f-16312893bac0] получена
[2024-10-19 16:21:22,130: WARNING/ForkPoolWorker-4] Запланирование автоматических задач.
[2024-10-19 16:21:22,158: WARNING/ForkPoolWorker-4] Задача-1
[2024-10-19 16:21:22,158: WARNING/ForkPoolWorker-4]
[2024-10-19 16:21:22,158: WARNING/ForkPoolWorker-4] stripeapp.tasks.create_google_sheet_with_payments_task
[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4]
[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4] каждые 60 секунд
[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4]
[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4] 2024-12-31 00:00:00+00:00
[2024-10-19 16:21:22,166: WARNING/ForkPoolWorker-4] Найдена задача расписания: TaskSchedule – ID задачи: 1, Статус: running, Время выполнения: 2024-10-19 10:02:52.361584+00:00
[2024-10-19 16:21:22,179: WARNING/ForkPoolWorker-4] Обновлена существующая периодическая задача: Task-1
[2024-10-19 16:21:22,241: WARNING/ForkPoolWorker-4] Обновлен следующий запуск для задачи 1 на 2024-10-19 16:22:22.237695+00:00.
[2024-10-19 16:21:22,241: WARNING/ForkPoolWorker-4] Осталось времени до следующего выполнения задачи 1: 59.996074 секунд.
[2024-10-19 16:21:22,241: INFO/ForkPoolWorker-4] Задача stripeapp.tasks.schedule_automated_tasks[199e547d-a0f2-46da-a89f-16312893bac0] завершилась успешно за 0.11220735599999898s: None
Ответ или решение
Периодическая задача не выполняется с Celery Beat: полное руководство по устранению неполадок
Когда вы работаете с системой обработки задач Celery в Django, может возникнуть ситуация, когда периодические задачи не выполняются, несмотря на то, что они были успешно созданы. В данном случае, задача stripeapp.tasks.create_google_sheet_with_payments_task
регистрируется, но не запускается по истечении заданного интервала. Рассмотрим ряд шагов, которые помогут вам точно диагностировать и устранить проблему.
1. Подтверждение корректности настройки Celery
Проверьте настройки в settings.py
Убедитесь, что ваши настройки Celery правильно прописаны в settings.py
. Обратите особое внимание на параметры CELERY_BROKER_URL
и CELERY_TIMEZONE
. Например:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_TIMEZONE = 'UTC'
Эти настройки обеспечивают корректное взаимодействие вашего приложения с брокером сообщений.
2. Проверка работоспособности Celery Workers и Beat
Убедитесь, что вы запускаете как workers, так и beat. Выполните следующие команды в терминале:
celery -A datamapper worker --loglevel=info
celery -A datamapper beat --loglevel=info
Следите за логами, чтобы подтвердить, что задачи действительно обрабатываются и время их выполнения фиксируется.
3. Проверка выполнения задач
Убедитесь, что ваши задачи действительно выполняются и не сталкиваются с ошибками. Если Celery worker не может запустить задачу, которая создается, например, из-за ошибок в параметрах или логики выполнения, это может остановить выполнение:
@shared_task
def create_google_sheet_with_payments_task(...):
try:
...
except Exception as e:
print(f"Ошибка при выполнении задачи: {e}")
4. Настройка PeriodicTask
Логика в функции schedule_automated_tasks
оказывается ключевой для корректной работы периодических задач. Убедитесь, что:
- У вас правильно определяется интервалы.
- Параметры передаются корректно в
args
. - Параметр
expires
не препятствует выполнению задачи.
Пример:
periodic_task, created = PeriodicTask.objects.get_or_create(
name=f"Task-{task.id}",
defaults={
...
'expires': end_date,
}
)
Если end_date
уже истек или определен неправильно, задача может не запускаться.
5. Логирование и отладка
Добавьте больше логирования на различных этапах процесса запуска задачи, особенно перед созданием и обновлением PeriodicTask
. Это поможет выявить ошибочные состояния:
print(f"Создание/обновление задачи для: {task.id}")
print(f"Параметры задачи: {args}")
6. Проверка базы данных Django
Использование django_celery_beat
подразумевает правильное обнуление базы данных задач. Не забудьте проверить, существуют ли в базе данных все созданные задачи и их статусы. Периодические задачи хранятся в таблице django_celery_beat_periodictask
. Запросите данные в Django shell:
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.all()
7. Проверка интервалов и частоты запуска
Функция get_interval_from_frequency
должна возвращать корректные значения. Проверьте, что частота в статусе TaskSchedule
правильно интепретируется и сопоставляется с нужным интервалом.
Заключение
Устранение неполадок в Celery и Django может быть сложным, но следуя этим шагам, вы можете выявить и исправить проблему с периодическими задачами. Если ни одно из предложенных решений не работает, возможно, имеет смысл просмотреть дополнительные зависимости, версии библиотек или ошибки, возникшие в процессе выполнения. Не забывайте также о документации Celery и Django для более глубокого понимания работы всех компонентов.
Если у вас есть дополнительные вопросы, связанные с настройкой или проблемами, не стесняйтесь обращаться за помощью!