Как заставить очередь задач на Python ждать сигнала о пробуждении

Вопрос или проблема

Я создал пул процессов Python с помощью функции multiprocessing.pool, каждый из которых выполняет подобную работу. Это нужно для того, чтобы выполнять больше работы одновременно.

Я хотел бы, чтобы все задачи ждали одного события/сигнала/семафора или другого объекта, который заставляет задачи спать до получения сигнала. Они будут выполнять свою работу, а затем снова засыпать в ожидании триггера этого события.

Другие задачи в программе будут инициировать событие, когда у них есть работа, которую нужно выполнить.

Я не уверен, что могу использовать для публикации событий другими задачами, когда им нужно выполнять работу. Я не знаю, как заставить эти задачи ждать этого сигнала.

Используя Pool(), задачи представляют собой разные процессы, но все они должны просыпаться от общего сигнала какого-то рода.

Ответ или решение

Для решения вашей задачи с использованием пула процессов в Python и ожиданием события, вы можете воспользоваться классом Event из модуля multiprocessing. Это обеспечит механизм синхронизации, который позволит процессам ожидать сигнала для выполнения работы.

Ниже приведен полный пример, который демонстрирует, как организовать подобное:

import multiprocessing
import time
import random

def worker(event, worker_id):
    while True:
        print(f"Рабочий {worker_id} ожидает сигнала...")
        event.wait()  # Ожидание сигнала
        print(f"Рабочий {worker_id} получил сигнал и начинает работу...")

        # Выполнение работы
        time.sleep(random.uniform(1, 3))  # Эмулируем работу
        print(f"Рабочий {worker_id} завершил работу.")

        event.clear()  # Сброс события, чтобы вернуться в режим ожидания

def main():
    num_workers = 4
    event = multiprocessing.Event()  # Создание события
    pool = multiprocessing.Pool(processes=num_workers, initializer=init_worker, initargs=(event,))

    # Запуск рабочих процессов
    for i in range(num_workers):
        pool.apply_async(worker, (event, i,))

    # Основной поток будет отправлять сигналы на выполнение
    try:
        while True:
            time.sleep(random.uniform(2, 5))  # Эмулируем период времени между заданиями
            print("Основной поток отправляет сигнал на выполнение работы.")
            event.set()  # Установка события для сигнализации рабочим
    except KeyboardInterrupt:
        print("Завершение работы...")
    finally:
        pool.terminate()
        pool.join()

if __name__ == "__main__":
    main()

Объяснение кода:

  1. Импорт необходимых модулей: Вы импортируете модули multiprocessing, time и random.

  2. Функция worker: Эта функция принимает событие и идентификатор рабочего процесса. Она входит в бесконечный цикл, в котором сначала ожидает установления события (signal). Как только событие установлено (event.set()), рабочий процесс выполняет свою работу (симулируется с помощью time.sleep()) и затем сбрасывает событие с помощью event.clear() для того, чтобы снова ожидать сигнал.

  3. Функция main: Здесь вы создаете пул процессов и запускаете несколько работников. Также в этой функции основной поток будет периодически устанавливать событие для сигнализации рабочим процессам о том, что они могут начинать работу.

  4. Обработка завершения программы: Используется обработка исключений для корректного завершения работы пулов процессов при прерывании программы (например, с помощью Ctrl+C).

Такой подход обеспечит синхронность управления между основным потоком и рабочими процессами и позволит эффективнее распределять рабочую нагрузку.

Оцените материал
Добавить комментарий

Капча загружается...