Асинхронная обработка задач на основе PostgreSQL

Вопрос или проблема

Я ищу систему обработки асинхронных заданий, подобную celery или python-rq, но основанную на PostgreSQL.

Необходимые функции:

  • Асинхронная обработка заданий
  • Использует PostgreSQL
  • API для Python
  • Сохраняет сообщения в независимом от языка формате (json, yaml, протоколы буферов и т. д.) НЕ python pickle.
  • С открытым исходным кодом

Почему PostgreSQL:

  • Мы уже используем PostgreSQL
  • Задания не должны теряться. Это не является особенностью Redis и RabbitMQ по умолчанию.
  • У нас всего несколько заданий в час. Простота важнее производительности.
  • Celery слишком велик

Связанная статья о SELECT FOR UPDATE .. SKIP LOCKED, которая была введена в PostgreSQL 9.5: https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

Это не решает все желаемые функции, но большинство из них:

https://www.npmjs.com/package/pg-boss

Это основано на node.js, а не на Python.

Оно использует PostgreSQL с новой функцией SKIP LOCKED.

Ответ или решение

Асинхронная обработка задач на основе PostgreSQL

В современном мире высоких технологий управление фоновыми задачами — это важная составляющая эффективной работы приложений. Если вы ищете систему асинхронной обработки задач, которая надежно использует PostgreSQL, и вы уже имеете опыт работы с этой СУБД, ознакомьтесь с нашим предложением.

Зачем PostgreSQL?

  1. Существующая инфраструктура: Использование PostgreSQL позволит вам интегрировать обработку задач в уже имеющуюся инфраструктуру, минимизируя время и затраты на обучение команды.

  2. Надежность данных: В отличие от Redis и RabbitMQ, PostgreSQL предоставляет надежные механизмы защиты от потери данных. Информация о задачах сохраняется в транзакциях, что основания обеспечивает отсутствие их потери.

  3. Низкая частота задач: Если у вас в час обрабатывается всего несколько задач, PostgreSQL с его стабильностью и простотой будет наиболее подходящим решением.

  4. Минимизация сложности: Сравнительно с такими крупными решениями, как Celery, PostgreSQL является более легковесным вариантом, что значительно упрощает использование и управление системой.

Основные функции системы обработки задач

При разработке системы обработки задач следует учесть несколько ключевых требований:

  • Асинхронная обработка задач: Система должна позволять инициировать и выполнять задачи в фоновом режиме, не блокируя основной поток приложения.

  • Использование PostgreSQL: Система будет полностью интегрирована с PostgreSQL, что позволит нам использовать его мощные возможности.

  • API для Python: Необходимо предоставить простое и понятное API для Python, чтобы обеспечить легкую интеграцию и использование системы разработчиками.

  • Формат хранения сообщений: Хранение данных должно осуществляться в независимом от языка формате, например, JSON или Protocol Buffers, исключая использование Python pickle.

  • Открытое программное обеспечение: Решение должно быть открытым, позволяя кастомизацию и модернизацию в будущем.

Пример реализации

В качестве примера, можно рассмотреть использование модели, основанной на комбинации таблицы для задач и отдельных процессов для их обработки.

  1. Структура таблицы:

    CREATE TABLE job_queue (
       id SERIAL PRIMARY KEY,
       job_data JSONB NOT NULL,
       status VARCHAR(20) DEFAULT 'pending',
       created_at TIMESTAMP DEFAULT NOW(),
       updated_at TIMESTAMP DEFAULT NOW()
    );
  2. Добавление задачи:
    Для добавления задач в очередь можно использовать простую операцию вставки:

    INSERT INTO job_queue (job_data) VALUES ('{"task": "example_task", "parameters": {"key": "value"}}');
  3. Выборка и блокировка задачи:
    Для выбора готовой задачи можно использовать конструкцию SELECT FOR UPDATE SKIP LOCKED, обеспечивающую блокировку выбранной строки, что способствует исключению ее повторной обработки:

    WITH selected AS (
       SELECT * FROM job_queue 
       WHERE status = 'pending' 
       ORDER BY created_at 
       LIMIT 1 
       FOR UPDATE SKIP LOCKED
    )
    UPDATE job_queue SET status = 'processing' WHERE id IN (SELECT id FROM selected);
  4. Обработка задачи:
    Задачи могут быть обработаны отдельными процессами с помощью Python, которые будут обращаться к API вашей системы для добавления и изменения статуса задач.

Заключение

Создание системы асинхронной обработки задач на основе PostgreSQL — это осмысленный шаг для разработки, учитывающий имеющуюся инфраструктуру и потребности вашего бизнеса. Такой подход обеспечит надежность, простоту и гибкость в управлении задачами.

Если у вас будут вопросы или возникнет необходимость в помощи, не стесняйтесь обращаться. Мы готовы помочь вам с дальнейшим развитием вашей системы!

Оцените материал
Добавить комментарий

Капча загружается...