Пользовательский обработчик для FastAPI для обработки закрытия соединений

Вопрос или проблема

У меня есть странная проблема с FastAPI. Предыстория такова, что у меня есть очередь, которая состоит из задач, добавляемых, когда я делаю запрос к конечной точке FastAPI. Я использую это для тестирования, поэтому намеренно держу запрос открытым, пока задача не будет выполнена очередью. Поскольку может быть много задач, накапливающихся, я хочу завершить всё это аккуратно при закрытии приложения. У меня есть функция, которая немедленно обрабатывает все открытые запросы, которые всё ещё ждут своего ответа. Однако я не могу вызвать это с помощью события завершения FastAPI в его жизненном цикле. Это происходит только тогда, когда все соединения закрыты, и это именно то, что я пытаюсь сделать здесь. Поэтому я попытался зарегистрировать свой собственный механизм завершения, который немедленно обрабатывает все открытые запросы для их закрытия, добавив это в жизненный цикл:

@asynccontextmanager
async def lifespan(instance: Starlette):
    signal.signal(signal.SIGTERM, lambda signum, frame: handle_shutdown())
    signal.signal(signal.SIGINT, lambda signum, frame: handle_shutdown())
    logger.info("Событие запуска выполнено, обработчики сигналов зарегистрированы.")
    yield 

Это работает хорошо, но теперь FastAPI не выполняет свой собственный механизм завершения. Есть идеи, как я мог бы это исправить? Буду признателен за любые советы или ссылки.

Ответ или решение

Вопрос, который вы подняли, является важным аспектом работы с FastAPI, особенно когда речь идет о управлении состоянием приложений и адекватном завершении соединений. Основная проблема заключается в том, что при регистрации пользовательских обработчиков завершения на событие сигналов (таких как SIGINT и SIGTERM) вы перехватываете процесс завершения, что мешает FastAPI выполнить свои собственные механизмы завершения. Рассмотрим, как можно организовать корректную обработку завершения соединений и согласованное завершение работы приложения на основе вашей ситуации.

Решение проблемы с закрытием соединений в FastAPI

Для того чтобы graceful shutdown работал правильно, необходимо учесть несколько аспектов:

  1. Регистрация обработчиков сигналов: Вы уже используете signal для регистрации сигналов, но при этом необходимо убедиться, что оригинальные обработчики Shutdown FastAPI выполняются корректно.

  2. Использование контекстного менеджера lifespan: Вы можете использовать контекстный менеджер lifespan, чтобы управлять сигналами, не мешая стандартной логике.

  3. Отложенное завершение: Создадим функцию, которая будет завершать открытые соединения и только после этого даст возможность FastAPI выполнить своё закрытие.

Пример реализации

Вот пример того, как можно организовать корректное завершение при использовании FastAPI:

from fastapi import FastAPI
from fastapi import Request
from contextlib import asynccontextmanager
import signal
import logging
import asyncio

app = FastAPI()
logger = logging.getLogger("uvicorn")

active_requests = set()

async def handle_shutdown():
    logger.info("Initiating shutdown...")

    # Завершение всех активных запросов.
    await asyncio.gather(*(req.cancel() for req in active_requests))
    logger.info("All active requests have been canceled.")

@asynccontextmanager
async def lifespan(app: FastAPI):
    def signal_handler(signum, frame):
        asyncio.create_task(handle_shutdown())

    signal.signal(signal.SIGTERM, signal_handler)
    signal.signal(signal.SIGINT, signal_handler)

    logger.info("Startup event executed, signal handlers registered.")
    yield
    logger.info("Shutdown event completed.")

@app.middleware("http")
async def add_active_request(request: Request, call_next):
    active_requests.add(asyncio.current_task())
    try:
        response = await call_next(request)
    finally:
        active_requests.remove(asyncio.current_task())
    return response

@app.get("/")
async def read_root():
    await asyncio.sleep(10)  # Симуляция долгого запроса
    return {"Hello": "World"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, lifespan=lifespan)

Объяснение кода:

  1. Middleware: Каждый запрос регистрируется в active_requests при его начале и удаляется из него при завершении. Это позволяет отслеживать все активные запросы и корректно их завершить.

  2. Обработчик завершения: Функция handle_shutdown отвечает за завершение всех активных запросов. Она вызывается при получении сигналов SIGINT и SIGTERM.

  3. Lifespan контекстный менеджер: В нем регистрируются сигнал-обработчики и обеспечивается правильный порядок завершения.

Заключение

Этот подход позволяет вам контролировать процесс завершения соединений, не мешая внутреннему механизму FastAPI. Применяя данную стратегию, вы сможете обеспечить корректное завершение всех активных запросов, поддерживая стабильность вашего приложения даже в условиях нагрузки и обработки большого числа запросов.

Если у вас есть дополнительные вопросы или необходима помощь в интеграции этого решения, не стесняйтесь обращаться.

Оцените материал
Добавить комментарий

Капча загружается...