Асинхронный производитель/потребитель, работающий последовательно

Вопрос или проблема

Я начинаю работать с asyncio и создал следующее:

import asyncio

async def handle_data(q):
    while (item := await q.get()) is not None:
        print(f'Обрабатываю {item}')

async def main():
    q = asyncio.Queue()
    consumer = asyncio.create_task(handle_data(q))

    for k in range(10):
        print(f'Производим {k}')
        await q.put(k)

    await q.put(None)
    await consumer

asyncio.run(main())

Скрипт выводит

Производим 0
Производим 1
Производим 2
Производим 3
Производим 4
Производим 5
Производим 6
Производим 7
Производим 8
Производим 9
Обрабатываю 0
Обрабатываю 1
Обрабатываю 2
Обрабатываю 3
Обрабатываю 4
Обрабатываю 5
Обрабатываю 6
Обрабатываю 7
Обрабатываю 8
Обрабатываю 9

Почему код выполняется синхронно?

В вашем коде синхронное поведение происходит потому, что корутина handle_data запускается только один раз, после того как все элементы будут сгенерированы в main. Чтобы производитель и потребитель работали одновременно, вам нужно запустить обе задачи одновременно.
Вот как это исправить:

  1. Запустите handle_data как фоновую задачу до начала производства элементов.

  2. Используйте await asyncio.gather(), чтобы запустить производителя и потребителя одновременно.

Вот измененный код:

import asyncio

async def handle_data(q):
    while (item := await q.get()) is not None:
        print(f'Обрабатываю {item}')

async def produce_data(q):
    for k in range(10):
        print(f'Производим {k}')
        await q.put(k)
        await asyncio.sleep(0.1)
    await q.put(None)

async def main():
    q = asyncio.Queue()
    consumer = asyncio.create_task(handle_data(q))
    producer = asyncio.create_task(produce_data(q))

    await asyncio.gather(producer, consumer)

asyncio.run(main())

Объяснение

  • Теперь produce_data производит элементы, в то время как handle_data потребляет их параллельно, обе задачи работают одновременно через asyncio.gather().

  • Каждый раз, когда производитель добавляет элемент в очередь, потребитель может немедленно его обработать, что приводит к чередованию выводов “Производим” и “Обрабатываю”.

Надеюсь, это поможет.

Ответ или решение

Асинхронный продюсер и консюмёры: Почему ваш код работает синхронно?

Введение в асинхронное программирование может быть непростым, особенно когда вы только начинаете осваивать библиотеку asyncio в Python. Часто возникает вопрос: почему код, который должен быть асинхронным, ведёт себя как синхронный? В данном ответе разберем предоставленный вами код и выясним, как добиться одновременной работы продюсера и консюмёра.

Анализ исходного кода

Вы представили код следующего вида:

import asyncio

async def handle_data(q):
    while (item := await q.get()) is not None:
        print(f'Consuming {item}')

async def main():
    q = asyncio.Queue()
    consumer = asyncio.create_task(handle_data(q))

    for k in range(10):
        print(f'Producing {k}')
        await q.put(k)

    await q.put(None)
    await consumer

asyncio.run(main())

Этот код создает асинхронного потребителя (консюмера), который реагирует на входящие данные из очереди. Однако вся логика продюсера (работы по созданию данных) выполняется последовательно, прежде чем начнёт работать консюмер. В результате вы видите последовательный вывод:

Producing 0
Producing 1
...
Consuming 0
Consuming 1
...

Причина синхронного поведения

Асинхронность подразумевает, что различные части вашего кода могут выполняться одновременно, используя неблокирующие операции. Однако в вашем первоначальном примере сначала производится весь набор данных, и только затем запускается процесс "потребления". Это и вызывает эффект последовательного выполнения.

Оптимизация кода для асинхронного выполнения

Чтобы продюсер и консюмёр заработали одновременно, необходимо изменить структуру вашего кода. Вот как можно это сделать:

  1. Запуск функции консюмера до начала производства: Это обеспечит запущенный поток для обработки данных, который будет работать, пока продюсер будет создавать новые данные.
  2. Использование asyncio.gather(): Это позволяет запускать несколько асинхронных задач параллельно и дожидаться их завершения.

Вот обновленный код:

import asyncio

async def handle_data(q):
    while (item := await q.get()) is not None:
        print(f'Consuming {item}')

async def produce_data(q):
    for k in range(10):
        print(f'Producing {k}')
        await q.put(k)
        await asyncio.sleep(0.1)  # Симуляция задержки
    await q.put(None)

async def main():
    q = asyncio.Queue()
    consumer = asyncio.create_task(handle_data(q))
    producer = asyncio.create_task(produce_data(q))

    await asyncio.gather(producer, consumer)

asyncio.run(main())

Пояснения и результаты

  • Параллельное выполнение: Теперь produce_data работает параллельно с handle_data. Каждый раз, когда продюсер добавляет элемент в очередь, консюмёр имеет возможность сразу же начать его обрабатывать.
  • Интерлейсинг вывода: Благодаря асинхронному взаимодействию между продюсером и консюмёром вы получите чередование строк "Producing" и "Consuming", что демонстрирует истинную асинхронность вашего кода.

Заключение

Используя asyncio, вы можете значительно повысить эффективность и отзывчивость ваших приложений, однако важно правильно структурировать код для достижения желаемого асинхронного поведения. С помощью вышеописанных изменений ваше приложение теперь работает как продюсер/консюмёр, который взаимодействует асинхронно, что дает вам возможность эффективно обрабатывать данные без блокирования. Этот подход сделает ваш код более гибким и управляемым в условиях реального времени.

Оцените материал
Добавить комментарий

Капча загружается...