Dramatiq с asyncio, похоже, использует только одного работника

Вопрос или проблема

Я пытаюсь запустить несколько задач параллельно. Когда я определяю актор как синхронную функцию, он использует разных рабочих. Но когда функция асинхронная, он использует только один поток (рабочий).

В общем, я понимаю разницу между асинхронным параллелизмом и многопоточностью/многопроцессорностью, но есть ли способ использовать всех запущенных рабочих с асинхронными акторами?

Например, если вы ограничены библиотекой, которую используете внутри актора. Мне не кажется, что делать актор синхронным и запускать новый цикл событий внутри него — элегантное решение.

Код, который я использую

redis_broker = RedisBroker(
    host=settings.redis_url,
    port=settings.redis_port,
    db=settings.redis_db,
    password=settings.redis_password,
    middleware=[CurrentMessage()]  # с AsyncIO() в асинхронном случае
)
dramatiq.set_broker(redis_broker)

Синхронный актор

@dramatiq.actor
def jobs(param: int):
    logger.info(f"Начато {param}")
    time.sleep(param)
    logger.info(f"Завершено {param}")

Асинхронный актор

@dramatiq.actor
async def jobs(param: int):
    logger.info(f"Начато {param}")
    await asyncio.sleep(param)
    logger.info(f"Завершено {param}")

И я запускаю это как

g = group([
    jobs.send(1),
    jobs.send(2),
    jobs.send(3),
]).run()

Синхронные результаты

[2024-10-11 02:38:42,251] [PID 420038] [Thread-3] [dramatiq] [INFO] Начато 3
[2024-10-11 02:38:42,251] [PID 420038] [Thread-5] [dramatiq] [INFO] Начато 2
[2024-10-11 02:38:42,251] [PID 420038] [Thread-6] [dramatiq] [INFO] Начато 3
[2024-10-11 02:38:42,252] [PID 420038] [Thread-7] [dramatiq] [INFO] Начато 1
[2024-10-11 02:38:42,252] [PID 420038] [Thread-8] [dramatiq] [INFO] Начато 2
[2024-10-11 02:38:42,252] [PID 420038] [Thread-4] [dramatiq] [INFO] Начато 1
[2024-10-11 02:38:43,253] [PID 420038] [Thread-7] [dramatiq] [INFO] Завершено 1
[2024-10-11 02:38:43,254] [PID 420038] [Thread-4] [dramatiq] [INFO] Завершено 1
[2024-10-11 02:38:44,254] [PID 420038] [Thread-5] [dramatiq] [INFO] Завершено 2
[2024-10-11 02:38:44,254] [PID 420038] [Thread-8] [dramatiq] [INFO] Завершено 2
[2024-10-11 02:38:45,255] [PID 420038] [Thread-6] [dramatiq] [INFO] Завершено 3
[2024-10-11 02:38:45,255] [PID 420038] [Thread-3] [dramatiq] [INFO] Завершено 3

Асинхронные результаты

[2024-10-11 02:50:03,245] [PID 422276] [Thread-1] [dramatiq] [INFO] Начато 3
[2024-10-11 02:50:03,245] [PID 422276] [Thread-1] [dramatiq] [INFO] Начато 2
[2024-10-11 02:50:03,245] [PID 422276] [Thread-1] [dramatiq] [INFO] Начато 3
[2024-10-11 02:50:03,245] [PID 422276] [Thread-1] [dramatiq] [INFO] Начато 1
[2024-10-11 02:50:03,245] [PID 422276] [Thread-1] [dramatiq] [INFO] Начато 2
[2024-10-11 02:50:03,245] [PID 422276] [Thread-1] [dramatiq] [INFO] Начато 1
[2024-10-11 02:50:04,246] [PID 422276] [Thread-1] [dramatiq] [INFO] Завершено 1
[2024-10-11 02:50:04,246] [PID 422276] [Thread-1] [dramatiq] [INFO] Завершено 1
[2024-10-11 02:50:05,247] [PID 422276] [Thread-1] [dramatiq] [INFO] Завершено 2
[2024-10-11 02:50:05,247] [PID 422276] [Thread-1] [dramatiq] [INFO] Завершено 2
[2024-10-11 02:50:06,246] [PID 422276] [Thread-1] [dramatiq] [INFO] Завершено 3
[2024-10-11 02:50:06,246] [PID 422276] [Thread-1] [dramatiq] [INFO] Завершено 3

Ответ или решение

Поведение, которое вы наблюдаете, связано с тем, как драматик (Dramatiq) обрабатывает асинхронные функции в сравнении с синхронными. Давайте разберем проблему и предложим возможные решения.

Проблема

Когда вы определяете акторы как асинхронные функции, Dramatiq использует только один рабочий поток (поток), что может быть нежелательным поведением, если вы хотите использовать преимущества параллелизма при выполнении нескольких задач.

Асинхронные функции в Python управляются событийным циклом, и Dramatiq в этом контексте по умолчанию использует один рабочий поток для выполнения асинхронных задач, что объясняет, почему у вас в логах отображаются только один и тот же поток.

Решение

Чтобы достичь желаемого поведения, есть несколько подходов:

  1. Используйте синхронные акторы:
    Если это приемлемо в вашем случае, вы можете использовать синхронные функции, что позволит Dramatiq использовать параллелизм на уровне потоков.

  2. Конфигурация Dramatiq:
    Убедитесь, что вы правильно настроили Dramatiq, чтобы он использовал несколько рабочих процессов (workers). Вы можете запустить несколько рабочих процессов, указав количество процессов в команде запуска:

    dramatiq my_module --workers 4
  3. Разделение задач:
    Если ваш асинхронный актор зависит от блока кода, который использует синхронные вызовы, возможно, стоит переосмыслить архитектуру разграничения задач в вашем приложении. Вы можете разбить задачи на более мелкие выполнения и отправлять их в отдельные висячие асинхронные задания.

  4. Использование других библиотек asyncio:
    Если ваш код требует высокой степени параллелизма и требует возможности выполнять множество задач, вы можете рассмотреть использование asyncio.gather() для выполнения нескольких асинхронных заданий одновременно. Однако, это будет в контексте одной и той же асинхронной функции, и не гарантирует многоянный параллелизм на уровне процессов.

  5. Middleware для поддержки асинхронных задач:
    Если вы используете какой-либо middleware, удостоверитесь, что он поддерживает асинхронный режим работы. Например, middleware должен быть совместим с асинхронной обработкой внутри Dramatiq.

Пример

Если вы хотите продолжать использовать асинхронные функции, вы можете попробовать следующее:

import dramatiq

@dramatiq.actor
async def jobs(param: int):
    logger.info(f"Started {param}")
    await asyncio.sleep(param)
    logger.info(f"Ended {param}")

# Запуск нескольких задач одновременно
g = group([
    jobs.send(1),
    jobs.send(2),
    jobs.send(3),
])

g.run()

Или вы можете использовать asyncio.gather для запуска несколько задач:

async def run_jobs():
    await asyncio.gather(
        jobs.send(1),
        jobs.send(2),
        jobs.send(3),
    )

asyncio.run(run_jobs())

Заключение

Использование асинхронных акторов в Dramatiq не всегда ведет к многоянному (многопоточному) выполнению. Убедитесь, что ваша настройка позволяет использовать нескольких рабочих процессов в Dramatiq, и при необходимости переосмысьте архитектуру ваших асинхронных задач. Все перечисленные решения имеют свои плюсы и минусы, поэтому выберите подходящий для вашей задачи.

Оцените материал
Добавить комментарий

Капча загружается...