Вопрос или проблема
Я ищу систему обработки асинхронных заданий, подобную celery или python-rq, но основанную на PostgreSQL.
Необходимые функции:
- Асинхронная обработка заданий
- Использует PostgreSQL
- API для Python
- Сохраняет сообщения в независимом от языка формате (json, yaml, протоколы буферов и т. д.) НЕ python pickle.
- С открытым исходным кодом
Почему PostgreSQL:
- Мы уже используем PostgreSQL
- Задания не должны теряться. Это не является особенностью Redis и RabbitMQ по умолчанию.
- У нас всего несколько заданий в час. Простота важнее производительности.
- Celery слишком велик
Связанная статья о SELECT FOR UPDATE .. SKIP LOCKED
, которая была введена в PostgreSQL 9.5: https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/
Это не решает все желаемые функции, но большинство из них:
https://www.npmjs.com/package/pg-boss
Это основано на node.js, а не на Python.
Оно использует PostgreSQL с новой функцией SKIP LOCKED.
Ответ или решение
Асинхронная обработка задач на основе PostgreSQL
В современном мире высоких технологий управление фоновыми задачами — это важная составляющая эффективной работы приложений. Если вы ищете систему асинхронной обработки задач, которая надежно использует PostgreSQL, и вы уже имеете опыт работы с этой СУБД, ознакомьтесь с нашим предложением.
Зачем PostgreSQL?
-
Существующая инфраструктура: Использование PostgreSQL позволит вам интегрировать обработку задач в уже имеющуюся инфраструктуру, минимизируя время и затраты на обучение команды.
-
Надежность данных: В отличие от Redis и RabbitMQ, PostgreSQL предоставляет надежные механизмы защиты от потери данных. Информация о задачах сохраняется в транзакциях, что основания обеспечивает отсутствие их потери.
-
Низкая частота задач: Если у вас в час обрабатывается всего несколько задач, PostgreSQL с его стабильностью и простотой будет наиболее подходящим решением.
-
Минимизация сложности: Сравнительно с такими крупными решениями, как Celery, PostgreSQL является более легковесным вариантом, что значительно упрощает использование и управление системой.
Основные функции системы обработки задач
При разработке системы обработки задач следует учесть несколько ключевых требований:
-
Асинхронная обработка задач: Система должна позволять инициировать и выполнять задачи в фоновом режиме, не блокируя основной поток приложения.
-
Использование PostgreSQL: Система будет полностью интегрирована с PostgreSQL, что позволит нам использовать его мощные возможности.
-
API для Python: Необходимо предоставить простое и понятное API для Python, чтобы обеспечить легкую интеграцию и использование системы разработчиками.
-
Формат хранения сообщений: Хранение данных должно осуществляться в независимом от языка формате, например, JSON или Protocol Buffers, исключая использование Python pickle.
-
Открытое программное обеспечение: Решение должно быть открытым, позволяя кастомизацию и модернизацию в будущем.
Пример реализации
В качестве примера, можно рассмотреть использование модели, основанной на комбинации таблицы для задач и отдельных процессов для их обработки.
-
Структура таблицы:
CREATE TABLE job_queue ( id SERIAL PRIMARY KEY, job_data JSONB NOT NULL, status VARCHAR(20) DEFAULT 'pending', created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() );
-
Добавление задачи:
Для добавления задач в очередь можно использовать простую операцию вставки:INSERT INTO job_queue (job_data) VALUES ('{"task": "example_task", "parameters": {"key": "value"}}');
-
Выборка и блокировка задачи:
Для выбора готовой задачи можно использовать конструкциюSELECT FOR UPDATE SKIP LOCKED
, обеспечивающую блокировку выбранной строки, что способствует исключению ее повторной обработки:WITH selected AS ( SELECT * FROM job_queue WHERE status = 'pending' ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED ) UPDATE job_queue SET status = 'processing' WHERE id IN (SELECT id FROM selected);
-
Обработка задачи:
Задачи могут быть обработаны отдельными процессами с помощью Python, которые будут обращаться к API вашей системы для добавления и изменения статуса задач.
Заключение
Создание системы асинхронной обработки задач на основе PostgreSQL — это осмысленный шаг для разработки, учитывающий имеющуюся инфраструктуру и потребности вашего бизнеса. Такой подход обеспечит надежность, простоту и гибкость в управлении задачами.
Если у вас будут вопросы или возникнет необходимость в помощи, не стесняйтесь обращаться. Мы готовы помочь вам с дальнейшим развитием вашей системы!