Параллельная обработка задач Celery с последовательной обработкой уникального ключа маршрута

Вопрос или проблема

Я работаю над службой посредника, которая получает события от нескольких систем мониторинга, коррелирует контекстную информацию и инициирует последующие сообщения/действия. То есть, получать event[topic_key] > искать в системе тикетов существующий/открытый {записывать, если существует & открывайте, иначе создавайте новую проблему} > запускать последующие задачи

Система тикетов действует как простая система контроля наводнений, чтобы предотвратить дублирующие действия/уведомления для уникального event[topic_key]

Мне нужно иметь возможность обрабатывать большое количество event[topic_key] одновременно, но нужно избежать создания дублирующих новых тикетов, когда несколько событий с одним и тем же topic_key принимаются в очень короткий промежуток времени. Мои работники celery настроены на масштабирование и должны масштабироваться, чтобы избежать задержек в обработке событий.

Есть ли у кого-либо опыт настройки брокера celery (в настоящее время rabbitmq, но может быть изменен при необходимости) обменов, очередей и/или route_key и т. д., чтобы обеспечить “уникальное параллельное” выполнение задач, где рабочие могут масштабироваться и обрабатывать задачи одновременно для уникальных topic_key, но предотвращать одновременное выполнение задач для нескольких случаев одного и того же topic_key в очереди задач? То есть, используя обмен topic.

Наши ключи тем обширны (тысячи), поэтому использование простых статичных/явных имен очередей (в одиночку) нецелесообразно.

Я могу достичь этого другими менее элегантными способами, но интересно, есть ли в celery/rabbitmq встроенные возможности, которые могут покрыть этот сценарий.

Ответ или решение

Для решения проблемы параллельной обработки задач Celery с последовательной обработкой уникального ключа маршрута (topic_key) можно использовать несколько подходов. Основная цель — обеспечить возможность параллельной обработки задач для уникальных topic_key, при этом избегая создания дубликатов задач для одинаковых ключей.

Использование Брокера RabbitMQ и Celery

  1. Тематические обмены и маршрутизация:
    Настройте RabbitMQ так, чтобы использовать тематические обмены. Это позволит вам гибко распределять задачи по разным очередям в зависимости от ключей маршрутизации (routing keys). Вы можете создать один обмен и несколько подписчиков, которые будут слушать разные ключи.

  2. Уникальные очереди для каждого ключа маршрута:
    Хотя у вас много topic_key, вы можете создать динамические очереди на основе topic_key, используя механизмы, такие как fanout или topic обмены. Например, вы можете создавать уникальные очереди для каждого ключа маршрута:

    from celery import Celery
    
    app = Celery('tasks', broker='pyamqp://guest@localhost//')
    
    @app.task
    def process_event(topic_key):
       # Логика обработки событий
       pass
  3. Избежание дублирования через блокировки:
    Чтобы избежать создания дублирующихся задач, вы можете использовать схему блокировки. Создайте Redis или другую in-memory хранилище для управления блокировками. Перед тем как ограничить выполнение задачи для определенного topic_key, проверьте, существует ли уже запись в хранилище:

    import redis
    
    r = redis.Redis()
    
    @app.task
    def process_event(topic_key):
       if r.exists(topic_key):
           return  # Задача уже обрабатывается
       r.set(topic_key, 'processing', ex=60)  # Блокируем на 60 секунд
       try:
           # Логика обработки события
           ...
       finally:
           r.delete(topic_key)  # Разблокируем после выполнения
  4. Использование важности задач:
    Если важность задач различна, вы можете использовать приоритеты задач для управления порядком их обработки. Celery поддерживает управление приоритетом задач, что позволяет вам обрабатывать более важные задачи первыми, но это может не решить проблему дублирования, поэтому это следует комбинировать с другими методами.

  5. Асинхронные обработчики с использованием сигналов:
    Celery также поддерживает сигналы, которые можно использовать для обработки ошибок или завершения задач. Это может быть полезно для отслеживания завершения задач и дальнейшей логики обработки событий, которая зависит от успешного выполнения предыдущих задач.

Заключение

Используя вышеупомянутые стратегии, вы сможете достичь одновременной обработки задач Celery для уникальных topic_key, в то время как предотвратите создание дублирующихся записей. Комбинация тематики обменов RabbitMQ с механизмами блокировки и управления состоянием, такими как Redis, даст вам нужную гибкость и производительность для обработки высоконагруженных систем с множеством событий.

Оцените материал
Добавить комментарий

Капча загружается...