Почему метод распознавания SpeechAsyncClient зависает при потоковой передаче

Вопросы и ответы

Обзор

Я реализовал простое соединение клиент-сервер с использованием WebSocket с помощью websocket.asyncio. Моя цель — передавать аудио от клиента к серверу и транскрибировать это аудио с помощью SpeechAsyncClient от Google Cloud. Вот код для сервера и клиента соответственно:

Сервер

import re

from google.cloud import speech_v1p1beta1 as speech
import google.api_core.retry_async as retries
import google.api_core.exceptions as core_exceptions

import asyncio
from websockets.asyncio.server import serve

streaming_config = speech.StreamingRecognitionConfig()
streaming_config.interim_results = True
streaming_config.config.encoding = speech.RecognitionConfig.AudioEncoding.LINEAR16
streaming_config.config.sample_rate_hertz = 16000
streaming_config.config.language_code = "en-US"
streaming_config.config.audio_channel_count = 1
streaming_config.config.enable_automatic_punctuation = True
streaming_config.config.profanity_filter = True

retry = retries.AsyncRetry(
    initial=0.1,
    maximum=60.0,
    multiplier=1.3,
    predicate=retries.if_exception_type(
        core_exceptions.DeadlineExceeded,
        core_exceptions.ServiceUnavailable,
    ),
    deadline=5000.0,
)

class Server:
    def __init__(self) -> None:
        self._is_streaming = False
        self._audio_queue = asyncio.Queue()
        self._speech_client = speech.SpeechAsyncClient()

    async def _read_audio(self):
        print("Чтение аудио")

        config_request = speech.StreamingRecognizeRequest()
        config_request.streaming_config = streaming_config
        yield config_request

        while self._is_streaming:
            chunk = await self._audio_queue.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = await self._audio_queue.get_nowait()
                    if chunk is None:
                        return
                    data.append(chunk)
                except asyncio.QueueEmpty:
                    break

            request = speech.StreamingRecognizeRequest()
            request.audio_content = b"".join(data)
            yield request

    async def _build_requests(self):
        print("Создание запросов")
        audio_generator = self._read_audio()
        responses = await self._speech_client.streaming_recognize(
            requests=audio_generator,
            retry=retry,
        )
        print("Готово")
        await self._listen_print_loop(responses)

    async def _handler(self, websocket):
        print("Соединение")
        asyncio.create_task(self._build_requests())
        self._is_streaming = True
        try:
            async for message in websocket:
                await self._audio_queue.put(message)
        except Exception as e:
            print(f"Не удалось: {e}")

    async def launch_server(self):
        print("Ожидание соединения...", end=" ")

        server = await serve(
            self._handler,
            host="localhost",
            port=8080,
        )

        await server.serve_forever()

    async def _listen_print_loop(self, responses) -> str:
        num_chars_printed = 0
        transcript = ""
        async for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript
            overwrite_chars = " " * (num_chars_printed - len(transcript))

            if not result.is_final:
                print(transcript + overwrite_chars)
                num_chars_printed = len(transcript)

            else:
                print(transcript + overwrite_chars)
                if re.search(r"\b(exit|quit)\b", transcript, re.I):
                    print("Выход..")
                    break

                num_chars_printed = 0

        return transcript

if __name__ == "__main__":
    server = Server()
    asyncio.run(server.launch_server())

Клиент

from websockets.asyncio.client import connect
import asyncio
import numpy as np
import sounddevice as sd
import queue

# Параметры записи аудио
RATE = 16000
CHUNK = int(RATE / 10)  # 100мс

class MicrophoneStream:
    """Открывает поток записи как генератор, выдающий аудио-чанки."""

    def __init__(self, rate: int = RATE, chunk: int = CHUNK):
        """Аудио -- и генератор -- гарантированно находятся в основном потоке."""
        self._rate = rate
        self._chunk = chunk

        # Создаем потокобезопасный буфер аудиоданных
        self._buff = queue.Queue()
        self.closed = True

    def __enter__(self) -> "MicrophoneStream":
        self.closed = False

        # Запуск аудиопотока
        self._stream = sd.InputStream(
            samplerate=self._rate,
            channels=1,
            dtype="int16",
            blocksize=self._chunk,
            callback=self._fill_buffer
        )
        self._stream.start()

        return self

    def __exit__(self, type, value, traceback) -> None:
        """Закрывает поток, независимо от того, было ли соединение потеряно или нет."""
        self._stream.stop()
        self._stream.close()
        self.closed = True
        self._buff.put(None)

    def _fill_buffer(
            self, in_data: np.ndarray, frames: int, time, status
    ) -> None:
        """Непрерывно собирает данные из аудиопотока в буфер.

        Аргументы:
            in_data: Аудиоданные в виде массива NumPy
            frames: Число захваченных кадров
            time: Временная информация
            status: Флаги состояния
        """
        self._buff.put(in_data.tobytes())

    def generator(self):
        while not self.closed:
            # Используйте blocking get(), чтобы гарантировать наличие хотя бы одного чанка данных и остановить итерацию, если чанк None, что указывает на конец аудиопотока.
            chunk = self._buff.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = self._buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)

async def start_client():
    uri = "ws://localhost:8080"
    async with connect(uri) as websocket:
        with MicrophoneStream(RATE, CHUNK) as stream:
            for chunk in stream.generator():
                await websocket.send(chunk)

if __name__ == "__main__":
    asyncio.run(start_client())

Проблема

Это непрерывно отправляет аудио с клиентской стороны, заполняя аудиобуфер, а метод _build_requests создает асинхронный генератор, используемый методом SpeechAsyncClient.streaming_recognise. Однако этот метод зависает и не доходит до строки print(“Готово”) и никогда не вызывает генератор _read_audio. Есть ли проблема в том, как я использовал asyncio?

Ответ

Проанализируем вашу ситуацию и возможные причины, по которым метод SpeechAsyncClient.streaming_recognize может зависать в вашем коде. По вашему описанию возникает несколько ключевых моментов, на которые стоит обратить внимание.

Возможные причины зависания:

  1. Инициализация потока и асинхронного генератора: Убедитесь, что асинхронный генератор, предоставляемый методом _read_audio, правильно инициирует обработку запросов. На текущий момент в вашем коде метод _build_requests запускается сначала в asyncio.create_task, и это может привести к тому, что его выполнение начнёт происходить асинхронно до того, как будет введен поток аудио.

  2. Проблемы с очередью: Если self._audio_queue не будет заполняться данными, метод _read_audio не сможет извлечь данные для передачи в SpeechAsyncClient.streaming_recognize. Убедитесь, что аудиоданные действительно поступают и заполняют очереди.

  3. Закрытие потока: Убедитесь, что когда вы заканчиваете работу с аудиопотоком (например, завершение передачи аудио через WebSocket), вы корректно закрываете поток, отправляя None в _audio_queue в методе _handler.

  4. Отсутствие аудиоданных: Если ваш клиент не отправляет аудиоданные, сервер не сможет обработать их и будет ждать поступления аудио, что может привести к зависанию.

Рекомендации по улучшению:

  1. Проверка заполненности очереди: Добавьте отладочную информацию внутри метода _read_audio для проверки того, что данные действительно поступают в очередь. Например, вы можете устанавливать печать при получении каждого чека. Также проверьте, чтобы не было ситуации, когда данные не поступают в течение длительного времени.

  2. Упрощение работы с асинхронностью: Вы можете персонифицировать _build_requests и _handler, чтобы лучше управлять асинхронной логикой. Переносите asyncio.create_task внутрь метода _handler или запустите его до того, как начнете обрабатывать входящие сообщения.

  3. Дополнительная обработка исключений: Оберните код, который взаимодействует с SpeechAsyncClient, в более тщательную обработку исключений, чтобы отловить и диагностировать наличие проблем с сетью или API на стороне Google.

Корректировка кода:

Вот несколько изменений, которые могут помочь:

async def _handler(self, websocket):
    print("Connection")
    self._is_streaming = True
    asyncio.create_task(self._build_requests())  # Начинаем сбор запросов сразу
    try:
        async for message in websocket:
            await self._audio_queue.put(message)
    except Exception as e:
        print(f"Failed: {e}")
    finally:
        await self._audio_queue.put(None)  # Обязательно отправляем None для завершения
        self._is_streaming = False

Попробуйте следовать этим рекомендациям и вносить соответствующие изменения в ваш исходный код. После этого проверьте, будет ли успешно работать метод streaming_recognize, и устраните потенциальные блокировки.

Оцените материал
Добавить комментарий

Капча загружается...