Периодическая задача не выполняется с Celery Beat

Вопрос или проблема

Когда beat и worker запущены, периодическая задача создается в Django Celery Beat, но она не выполняется, когда интервал времени истекает. Эта задача не выполняется:

‘task’: ‘stripeapp.tasks.create_google_sheet_with_payments_task’,

Прокомментировано 2 часа назад

Tasks.py

from celery import shared_task
from django_celery_beat.models import PeriodicTask, IntervalSchedule
import json
from .models import TaskSchedule
from datetime import timedelta
from .stripe_to_sheet_saving import create_google_sheet_with_payments
from django.utils import timezone

@shared_task
def schedule_automated_tasks():
    print("Запланирование автоматических задач.")
    
    # Логирование всех существующих периодических задач для отладки
    existing_tasks = PeriodicTask.objects.all()
    for task in existing_tasks:
        print(task.name, task.task, task.interval, task.expires)

    # Получаем задачи из модели TaskSchedule, где статус 'running'
    running_tasks = TaskSchedule.objects.filter(status="running")

    for task_schedule in running_tasks:
        print(f"Найдена задача расписания: {task_schedule}")

        task = task_schedule.task
        frequency = task.stripe_source.run_frequency if task.stripe_source else None
        occurrence = task.stripe_source.occurence if task.stripe_source else None
        end_date = task_schedule.end_date

        # Определите интервал на основе частоты
        interval = get_interval_from_frequency(frequency)

        if interval is None:
            print(f"Неверная частота: {frequency} для задачи {task.id}. Пропускаем.")
            continue  # Неверная частота, пропускаем

        # Проверяем, должна ли задача быть отмечена как завершенная
        running_count = TaskSchedule.objects.filter(task=task, status="running").count()
        if occurrence is not None and end_date is None and running_count >= occurrence:
            task_schedule.status="complete"  # Обновляем статус на завершенный
            task_schedule.save()
            print(f"Задача {task.id} помечена как завершенная из-за предела вхождения.")

        # Запланируйте задачу с помощью Celery Beat
        periodic_task, created = PeriodicTask.objects.get_or_create(
            name=f"Task-{task.id}",
            defaults={
                'task': 'stripeapp.tasks.create_google_sheet_with_payments_task',
                'interval': IntervalSchedule.objects.get_or_create(every=interval, period=IntervalSchedule.SECONDS)[0],
                'args': json.dumps([
                    task.data_mapper_user.email, 
                    task.google_sheet_destination.sheet_name if task.google_sheet_destination else None,
                    task.stripe_source.payment_options, 
                    task.stripe_source.previous_data_option, 
                    task.id
                ]),
                'expires': end_date,  # Установить срок действия 
            }
        )
        
        if created:
            print(f"Создана новая периодическая задача: {periodic_task.name}")
        else:
            print(f"Обновлена существующая периодическая задача: {periodic_task.name}")
            # Обновить аргументы задачи
            periodic_task.args = json.dumps([
                task.data_mapper_user.email, 
                task.google_sheet_destination.sheet_name if task.google_sheet_destination else None,
                task.stripe_source.payment_options, 
                task.stripe_source.previous_data_option,  
                task.id
            ])
            periodic_task.save()

        # Обновить next_run и previous_run
        task.next_run = timezone.now() + timedelta(seconds=interval)  # Убедитесь, что это с учетом часового пояса
        task.previous_run = timezone.now()  # Убедитесь, что это с учетом часового пояса
        task.save()
        print(f"Обновлено следующий запуск для задачи {task.id} на {task.next_run}.")

        # Логируйте, сколько времени осталось до следующего запуска
        remaining_time = task.next_run - timezone.now()
        print(f"Осталось времени до следующего выполнения задачи {task.id}: {remaining_time.total_seconds()} секунд.")


def get_interval_from_frequency(frequency):
    """Возвращает интервал в секундах на основе строки частоты."""
    frequency_map = {
        'daily': 60,           # секунд в день
        'hourly': 3600,          # секунд в час
        'weekly': 604800,        # секунд в неделю
        'monthly': 2592000,      # секунд в месяц
        'quarterly': 7776000,    # секунд в квартал
        'semi_annually': 15552000,  # секунд в шесть месяцев
        'annually': 31536000     # секунд в год
    }
    return frequency_map.get(frequency)


@shared_task
def create_google_sheet_with_payments_task(email, google_sheet_name, payment_option, frequency_filter, custom_start_date=None, custom_end_date=None, task_id=None):
    print(f"Запуск задачи для email: {email}, имя листа: {google_sheet_name}.")
    return create_google_sheet_with_payments(
        email=email,
        google_sheet_name=google_sheet_name,
        payment_option=payment_option,
        frequency_filter=frequency_filter,
        custom_start_date=custom_start_date,
        custom_end_date=custom_end_date,
        task_id=task_id
    )

settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'  
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'schedule_automated_tasks': {
        'task': 'stripeapp.tasks.schedule_automated_tasks',
        'schedule': 60.0,  # выполняется каждую минуту
    },
}

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'datamapper.settings')

app = Celery('datamapper')

app.config_from_object('django.conf:settings', namespace="CELERY")

app.autodiscover_tasks()

вывод
[2024-10-19 16:21:22,128: INFO/MainProcess] Задача stripeapp.tasks.schedule_automated_tasks[199e547d-a0f2-46da-a89f-16312893bac0] получена

[2024-10-19 16:21:22,130: WARNING/ForkPoolWorker-4] Запланирование автоматических задач.

[2024-10-19 16:21:22,158: WARNING/ForkPoolWorker-4] Задача-1

[2024-10-19 16:21:22,158: WARNING/ForkPoolWorker-4]

[2024-10-19 16:21:22,158: WARNING/ForkPoolWorker-4] stripeapp.tasks.create_google_sheet_with_payments_task

[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4]

[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4] каждые 60 секунд

[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4]

[2024-10-19 16:21:22,159: WARNING/ForkPoolWorker-4] 2024-12-31 00:00:00+00:00

[2024-10-19 16:21:22,166: WARNING/ForkPoolWorker-4] Найдена задача расписания: TaskSchedule – ID задачи: 1, Статус: running, Время выполнения: 2024-10-19 10:02:52.361584+00:00

[2024-10-19 16:21:22,179: WARNING/ForkPoolWorker-4] Обновлена существующая периодическая задача: Task-1

[2024-10-19 16:21:22,241: WARNING/ForkPoolWorker-4] Обновлен следующий запуск для задачи 1 на 2024-10-19 16:22:22.237695+00:00.

[2024-10-19 16:21:22,241: WARNING/ForkPoolWorker-4] Осталось времени до следующего выполнения задачи 1: 59.996074 секунд.

[2024-10-19 16:21:22,241: INFO/ForkPoolWorker-4] Задача stripeapp.tasks.schedule_automated_tasks[199e547d-a0f2-46da-a89f-16312893bac0] завершилась успешно за 0.11220735599999898s: None

Ответ или решение

Периодическая задача не выполняется с Celery Beat: полное руководство по устранению неполадок

Когда вы работаете с системой обработки задач Celery в Django, может возникнуть ситуация, когда периодические задачи не выполняются, несмотря на то, что они были успешно созданы. В данном случае, задача stripeapp.tasks.create_google_sheet_with_payments_task регистрируется, но не запускается по истечении заданного интервала. Рассмотрим ряд шагов, которые помогут вам точно диагностировать и устранить проблему.

1. Подтверждение корректности настройки Celery

Проверьте настройки в settings.py

Убедитесь, что ваши настройки Celery правильно прописаны в settings.py. Обратите особое внимание на параметры CELERY_BROKER_URL и CELERY_TIMEZONE. Например:

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_TIMEZONE = 'UTC'

Эти настройки обеспечивают корректное взаимодействие вашего приложения с брокером сообщений.

2. Проверка работоспособности Celery Workers и Beat

Убедитесь, что вы запускаете как workers, так и beat. Выполните следующие команды в терминале:

celery -A datamapper worker --loglevel=info
celery -A datamapper beat --loglevel=info

Следите за логами, чтобы подтвердить, что задачи действительно обрабатываются и время их выполнения фиксируется.

3. Проверка выполнения задач

Убедитесь, что ваши задачи действительно выполняются и не сталкиваются с ошибками. Если Celery worker не может запустить задачу, которая создается, например, из-за ошибок в параметрах или логики выполнения, это может остановить выполнение:

@shared_task
def create_google_sheet_with_payments_task(...):
    try:
        ...
    except Exception as e:
        print(f"Ошибка при выполнении задачи: {e}")

4. Настройка PeriodicTask

Логика в функции schedule_automated_tasks оказывается ключевой для корректной работы периодических задач. Убедитесь, что:

  • У вас правильно определяется интервалы.
  • Параметры передаются корректно в args.
  • Параметр expires не препятствует выполнению задачи.

Пример:

periodic_task, created = PeriodicTask.objects.get_or_create(
    name=f"Task-{task.id}",
    defaults={
        ...
        'expires': end_date,
    }
)

Если end_date уже истек или определен неправильно, задача может не запускаться.

5. Логирование и отладка

Добавьте больше логирования на различных этапах процесса запуска задачи, особенно перед созданием и обновлением PeriodicTask. Это поможет выявить ошибочные состояния:

print(f"Создание/обновление задачи для: {task.id}")
print(f"Параметры задачи: {args}")

6. Проверка базы данных Django

Использование django_celery_beat подразумевает правильное обнуление базы данных задач. Не забудьте проверить, существуют ли в базе данных все созданные задачи и их статусы. Периодические задачи хранятся в таблице django_celery_beat_periodictask. Запросите данные в Django shell:

from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.all()

7. Проверка интервалов и частоты запуска

Функция get_interval_from_frequency должна возвращать корректные значения. Проверьте, что частота в статусе TaskSchedule правильно интепретируется и сопоставляется с нужным интервалом.

Заключение

Устранение неполадок в Celery и Django может быть сложным, но следуя этим шагам, вы можете выявить и исправить проблему с периодическими задачами. Если ни одно из предложенных решений не работает, возможно, имеет смысл просмотреть дополнительные зависимости, версии библиотек или ошибки, возникшие в процессе выполнения. Не забывайте также о документации Celery и Django для более глубокого понимания работы всех компонентов.

Если у вас есть дополнительные вопросы, связанные с настройкой или проблемами, не стесняйтесь обращаться за помощью!

Оцените материал
Добавить комментарий

Капча загружается...