Как отменить длительную asyncio.Task?

Вопрос или проблема

У меня есть следующий пример кода:

import asyncio
import concurrent.futures
import functools
import time

async def run_till_first_success(tasks, timeout=None):
    results = []
    exceptions = []
    while tasks:
        try:
            async with asyncio.timeout(timeout):
                done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
        except TimeoutError:
            print('время ожидания истекло')
            for task in tasks:
                task.cancel()  # это не работает!
            return

        # >>> этот код на самом деле не имеет значения
        for task in done:
            if task.exception():
                exceptions.append(task.exception())
            else:
                results.append(task.result())
        tasks = pending
        if results or len(pending) == 0:
            for task in pending:
                task.cancel()
            break
    if not results:
        raise exceptions[0]
    return results[0]
# <<<

class Worker:
    def __init__(self, execute):
        self.execute = execute

    async def work(self):
        return await self.execute(self._work)

    def _work(self):  # имитация долгого синхронного выполнения
        print('_work начата')
        time.sleep(5)
        print('_work завершена')

class LibraryInterfaceClass:
    def __init__(self):
        self._executor = concurrent.futures.ThreadPoolExecutor(10)
        self._workers = [Worker(self._execute) for _ in range(2)]

    async def use(self, timeout=1):
        tasks = {asyncio.create_task(worker.work()) for worker in self._workers}
        return await run_till_first_success(tasks, timeout)

    async def _execute(self, func, *args, **kwargs):
        return await asyncio.get_event_loop().run_in_executor(self._executor, functools.partial(func, *args, **kwargs))

interface = LibraryInterfaceClass()
asyncio.run(interface.use())
print('выполнение завершено')

Когда я запускаю это, я получаю следующий вывод:

_work начата
_work начата
время ожидания истекло
выполнение завершено
_work завершена
_work завершена

Моя цель - предотвратить вывод _work завершена. Другими словами, я хочу отменить длительные операции сразу после истечения времени ожидания. Как это сделать правильно? task.cancel() в моем случае не работает.

task.cancel() не останавливает поток, он лишь останавливает асинхронную корутину. Чтобы остановить поток, мы можем использовать сигналы событий в классе Worker для отмены длительных операций.

Вот ваш измененный код.

import asyncio
import concurrent.futures
import functools
import time
import threading

async def run_till_first_success(tasks, timeout=None):
    results = []
    exceptions = []
    while tasks:
        try:
            async with asyncio.timeout(timeout):
                done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
        except TimeoutError:
            print('время ожидания истекло')
            for task in tasks:
                task.cancel()  # Отменить задачу asyncio
            return

        # >>> этот код на самом деле не имеет значения
        for task in done:
            if task.exception():
                exceptions.append(task.exception())
            else:
                results.append(task.result())
        tasks = pending
        if results or len(pending) == 0:
            for task in pending:
                task.cancel()
            break
    if not results:
        raise exceptions[0]
    return results[0]
# <<<

class Worker:
    def __init__(self, execute):
        self.execute = execute
        self.current_thread = threading.Event()  # мы будем использовать это для остановки потоков

    async def work(self):
        return await self.execute(self._work)

    def _work(self):  # имитация долгого синхронного выполнения
        print('_work начата')
        # time.sleep(5)  # мы перемещаем это в цикл для проверки статуса отмены нашего потока
        for _ in range(5):  # Периодически проверяем статус отмены
            if self.current_thread.is_set():
                print('_work отменена')
                return
            time.sleep(1)

        print('_work завершена')

    def cancel(self):  # используем для остановки потока
        self.current_thread.set()

class LibraryInterfaceClass:
    def __init__(self):
        self._executor = concurrent.futures.ThreadPoolExecutor(10)
        self._workers = [Worker(self._execute) for _ in range(2)]

    async def use(self, timeout=1):
        tasks = {asyncio.create_task(worker.work()) for worker in self._workers}
        # отменить работников после завершения работы
        try:
            result = await run_till_first_success(tasks, timeout)
        finally:
            for worker in self._workers:
                worker.cancel()  # Убедиться, что все работники получили сигнал отмены
        return result

    async def _execute(self, func, *args, **kwargs):
        return await asyncio.get_event_loop().run_in_executor(self._executor, functools.partial(func, *args, **kwargs))

interface = LibraryInterfaceClass()
asyncio.run(interface.use())
print('выполнение завершено')

Вывод:

_work начата
_work начата
время ожидания истекло
выполнение завершено
_work отменена
_work отменена

Ответ или решение

Чтобы отменить длительно выполняемую задачу в asyncio, важно понимать, что task.cancel() останавливает только асинхронные корутины. Если же ваша задача выполняется в потоке, например, при использовании ThreadPoolExecutor, то task.cancel() не будет иметь эффекта, поскольку time.sleep() блокирует поток и не позволяет проверить состояние.

Для решения этой проблемы мы можем использовать механизмы синхронизации, например, Event из модуля threading. Давайте рассмотрим исправленный код, который позволяет корректно завершать выполнение длительных операций, когда возникает тайм-аут.

Исправленный код

import asyncio
import concurrent.futures
import functools
import time
import threading

async def run_till_first_success(tasks, timeout=None):
    results = []
    exceptions = []
    while tasks:
        try:
            async with asyncio.timeout(timeout):
                done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
        except TimeoutError:
            print('Тайм-аут сработал')
            for task in tasks:
                task.cancel()  # Отмена асинхронной задачи
            return

        # Обработка завершённых задач
        for task in done:
            if task.exception():
                exceptions.append(task.exception())
            else:
                results.append(task.result())
        tasks = pending
        if results or len(pending) == 0:
            for task in pending:
                task.cancel()
            break
    if not results:
        raise exceptions[0]
    return results[0]

class Worker:
    def __init__(self, execute):
        self.execute = execute
        self.current_thread = threading.Event()  # Используем для завершения потоков

    async def work(self):
        return await self.execute(self._work)

    def _work(self):  # Симуляция длинной блокирующей операции
        print('_work началась')
        # Здесь мы будем периодически проверять, был ли вызван cancel
        for _ in range(5):  # Проверяйте статус отмены периодически
            if self.current_thread.is_set():
                print('_work отменена')
                return
            time.sleep(1)

        print('_work завершилась')

    def cancel(self):  # Метод для остановки выполнения
        self.current_thread.set()

class LibraryInterfaceClass:
    def __init__(self):
        self._executor = concurrent.futures.ThreadPoolExecutor(10)
        self._workers = [Worker(self._execute) for _ in range(2)]

    async def use(self, timeout=1):
        tasks = {asyncio.create_task(worker.work()) for worker in self._workers}
        # Отмена работ после выполнения
        try:
            result = await run_till_first_success(tasks, timeout)
        finally:
            for worker in self._workers:
                worker.cancel()  # Убедитесь, что все рабочие получили сигнал отмены
        return result

    async def _execute(self, func, *args, **kwargs):
        return await asyncio.get_event_loop().run_in_executor(self._executor, functools.partial(func, *args, **kwargs))

# Запуск интерфейса
interface = LibraryInterfaceClass()
asyncio.run(interface.use())
print('Выполнение завершено')

Результат выполнения

При запуске этого кода вы получите следующий вывод:

_work началась
_work началась
Тайм-аут сработал
Выполнение завершено
_work отменена
_work отменена

Заключение

Теперь, когда происходит тайм-аут, длительные операции корректно отменяются, и вы не увидите сообщений о завершении _work, если они были отменены. Главное изменение заключается в использовании Event для управления выполнением блокирующих потоков, что позволяет безопасно завершить их работу.

Оцените материал
Добавить комментарий

Капча загружается...