Вопрос или проблема
У меня есть следующий пример кода:
import asyncio
import concurrent.futures
import functools
import time
async def run_till_first_success(tasks, timeout=None):
results = []
exceptions = []
while tasks:
try:
async with asyncio.timeout(timeout):
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
except TimeoutError:
print('время ожидания истекло')
for task in tasks:
task.cancel() # это не работает!
return
# >>> этот код на самом деле не имеет значения
for task in done:
if task.exception():
exceptions.append(task.exception())
else:
results.append(task.result())
tasks = pending
if results or len(pending) == 0:
for task in pending:
task.cancel()
break
if not results:
raise exceptions[0]
return results[0]
# <<<
class Worker:
def __init__(self, execute):
self.execute = execute
async def work(self):
return await self.execute(self._work)
def _work(self): # имитация долгого синхронного выполнения
print('_work начата')
time.sleep(5)
print('_work завершена')
class LibraryInterfaceClass:
def __init__(self):
self._executor = concurrent.futures.ThreadPoolExecutor(10)
self._workers = [Worker(self._execute) for _ in range(2)]
async def use(self, timeout=1):
tasks = {asyncio.create_task(worker.work()) for worker in self._workers}
return await run_till_first_success(tasks, timeout)
async def _execute(self, func, *args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
interface = LibraryInterfaceClass()
asyncio.run(interface.use())
print('выполнение завершено')
Когда я запускаю это, я получаю следующий вывод:
_work начата
_work начата
время ожидания истекло
выполнение завершено
_work завершена
_work завершена
Моя цель - предотвратить вывод _work завершена
. Другими словами, я хочу отменить длительные операции сразу после истечения времени ожидания. Как это сделать правильно? task.cancel()
в моем случае не работает.
task.cancel()
не останавливает поток, он лишь останавливает асинхронную корутину. Чтобы остановить поток, мы можем использовать сигналы событий в классе Worker для отмены длительных операций.
Вот ваш измененный код.
import asyncio
import concurrent.futures
import functools
import time
import threading
async def run_till_first_success(tasks, timeout=None):
results = []
exceptions = []
while tasks:
try:
async with asyncio.timeout(timeout):
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
except TimeoutError:
print('время ожидания истекло')
for task in tasks:
task.cancel() # Отменить задачу asyncio
return
# >>> этот код на самом деле не имеет значения
for task in done:
if task.exception():
exceptions.append(task.exception())
else:
results.append(task.result())
tasks = pending
if results or len(pending) == 0:
for task in pending:
task.cancel()
break
if not results:
raise exceptions[0]
return results[0]
# <<<
class Worker:
def __init__(self, execute):
self.execute = execute
self.current_thread = threading.Event() # мы будем использовать это для остановки потоков
async def work(self):
return await self.execute(self._work)
def _work(self): # имитация долгого синхронного выполнения
print('_work начата')
# time.sleep(5) # мы перемещаем это в цикл для проверки статуса отмены нашего потока
for _ in range(5): # Периодически проверяем статус отмены
if self.current_thread.is_set():
print('_work отменена')
return
time.sleep(1)
print('_work завершена')
def cancel(self): # используем для остановки потока
self.current_thread.set()
class LibraryInterfaceClass:
def __init__(self):
self._executor = concurrent.futures.ThreadPoolExecutor(10)
self._workers = [Worker(self._execute) for _ in range(2)]
async def use(self, timeout=1):
tasks = {asyncio.create_task(worker.work()) for worker in self._workers}
# отменить работников после завершения работы
try:
result = await run_till_first_success(tasks, timeout)
finally:
for worker in self._workers:
worker.cancel() # Убедиться, что все работники получили сигнал отмены
return result
async def _execute(self, func, *args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
interface = LibraryInterfaceClass()
asyncio.run(interface.use())
print('выполнение завершено')
Вывод:
_work начата
_work начата
время ожидания истекло
выполнение завершено
_work отменена
_work отменена
Ответ или решение
Чтобы отменить длительно выполняемую задачу в asyncio
, важно понимать, что task.cancel()
останавливает только асинхронные корутины. Если же ваша задача выполняется в потоке, например, при использовании ThreadPoolExecutor
, то task.cancel()
не будет иметь эффекта, поскольку time.sleep()
блокирует поток и не позволяет проверить состояние.
Для решения этой проблемы мы можем использовать механизмы синхронизации, например, Event
из модуля threading
. Давайте рассмотрим исправленный код, который позволяет корректно завершать выполнение длительных операций, когда возникает тайм-аут.
Исправленный код
import asyncio
import concurrent.futures
import functools
import time
import threading
async def run_till_first_success(tasks, timeout=None):
results = []
exceptions = []
while tasks:
try:
async with asyncio.timeout(timeout):
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED, timeout=timeout)
except TimeoutError:
print('Тайм-аут сработал')
for task in tasks:
task.cancel() # Отмена асинхронной задачи
return
# Обработка завершённых задач
for task in done:
if task.exception():
exceptions.append(task.exception())
else:
results.append(task.result())
tasks = pending
if results or len(pending) == 0:
for task in pending:
task.cancel()
break
if not results:
raise exceptions[0]
return results[0]
class Worker:
def __init__(self, execute):
self.execute = execute
self.current_thread = threading.Event() # Используем для завершения потоков
async def work(self):
return await self.execute(self._work)
def _work(self): # Симуляция длинной блокирующей операции
print('_work началась')
# Здесь мы будем периодически проверять, был ли вызван cancel
for _ in range(5): # Проверяйте статус отмены периодически
if self.current_thread.is_set():
print('_work отменена')
return
time.sleep(1)
print('_work завершилась')
def cancel(self): # Метод для остановки выполнения
self.current_thread.set()
class LibraryInterfaceClass:
def __init__(self):
self._executor = concurrent.futures.ThreadPoolExecutor(10)
self._workers = [Worker(self._execute) for _ in range(2)]
async def use(self, timeout=1):
tasks = {asyncio.create_task(worker.work()) for worker in self._workers}
# Отмена работ после выполнения
try:
result = await run_till_first_success(tasks, timeout)
finally:
for worker in self._workers:
worker.cancel() # Убедитесь, что все рабочие получили сигнал отмены
return result
async def _execute(self, func, *args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(self._executor, functools.partial(func, *args, **kwargs))
# Запуск интерфейса
interface = LibraryInterfaceClass()
asyncio.run(interface.use())
print('Выполнение завершено')
Результат выполнения
При запуске этого кода вы получите следующий вывод:
_work началась
_work началась
Тайм-аут сработал
Выполнение завершено
_work отменена
_work отменена
Заключение
Теперь, когда происходит тайм-аут, длительные операции корректно отменяются, и вы не увидите сообщений о завершении _work
, если они были отменены. Главное изменение заключается в использовании Event
для управления выполнением блокирующих потоков, что позволяет безопасно завершить их работу.