Обзор
Я реализовал простое соединение клиент-сервер с использованием WebSocket с помощью websocket.asyncio. Моя цель — передавать аудио от клиента к серверу и транскрибировать это аудио с помощью SpeechAsyncClient от Google Cloud. Вот код для сервера и клиента соответственно:
Сервер
import re
from google.cloud import speech_v1p1beta1 as speech
import google.api_core.retry_async as retries
import google.api_core.exceptions as core_exceptions
import asyncio
from websockets.asyncio.server import serve
streaming_config = speech.StreamingRecognitionConfig()
streaming_config.interim_results = True
streaming_config.config.encoding = speech.RecognitionConfig.AudioEncoding.LINEAR16
streaming_config.config.sample_rate_hertz = 16000
streaming_config.config.language_code = "en-US"
streaming_config.config.audio_channel_count = 1
streaming_config.config.enable_automatic_punctuation = True
streaming_config.config.profanity_filter = True
retry = retries.AsyncRetry(
initial=0.1,
maximum=60.0,
multiplier=1.3,
predicate=retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
),
deadline=5000.0,
)
class Server:
def __init__(self) -> None:
self._is_streaming = False
self._audio_queue = asyncio.Queue()
self._speech_client = speech.SpeechAsyncClient()
async def _read_audio(self):
print("Чтение аудио")
config_request = speech.StreamingRecognizeRequest()
config_request.streaming_config = streaming_config
yield config_request
while self._is_streaming:
chunk = await self._audio_queue.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = await self._audio_queue.get_nowait()
if chunk is None:
return
data.append(chunk)
except asyncio.QueueEmpty:
break
request = speech.StreamingRecognizeRequest()
request.audio_content = b"".join(data)
yield request
async def _build_requests(self):
print("Создание запросов")
audio_generator = self._read_audio()
responses = await self._speech_client.streaming_recognize(
requests=audio_generator,
retry=retry,
)
print("Готово")
await self._listen_print_loop(responses)
async def _handler(self, websocket):
print("Соединение")
asyncio.create_task(self._build_requests())
self._is_streaming = True
try:
async for message in websocket:
await self._audio_queue.put(message)
except Exception as e:
print(f"Не удалось: {e}")
async def launch_server(self):
print("Ожидание соединения...", end=" ")
server = await serve(
self._handler,
host="localhost",
port=8080,
)
await server.serve_forever()
async def _listen_print_loop(self, responses) -> str:
num_chars_printed = 0
transcript = ""
async for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
if not result.is_final:
print(transcript + overwrite_chars)
num_chars_printed = len(transcript)
else:
print(transcript + overwrite_chars)
if re.search(r"\b(exit|quit)\b", transcript, re.I):
print("Выход..")
break
num_chars_printed = 0
return transcript
if __name__ == "__main__":
server = Server()
asyncio.run(server.launch_server())
Клиент
from websockets.asyncio.client import connect
import asyncio
import numpy as np
import sounddevice as sd
import queue
# Параметры записи аудио
RATE = 16000
CHUNK = int(RATE / 10) # 100мс
class MicrophoneStream:
"""Открывает поток записи как генератор, выдающий аудио-чанки."""
def __init__(self, rate: int = RATE, chunk: int = CHUNK):
"""Аудио -- и генератор -- гарантированно находятся в основном потоке."""
self._rate = rate
self._chunk = chunk
# Создаем потокобезопасный буфер аудиоданных
self._buff = queue.Queue()
self.closed = True
def __enter__(self) -> "MicrophoneStream":
self.closed = False
# Запуск аудиопотока
self._stream = sd.InputStream(
samplerate=self._rate,
channels=1,
dtype="int16",
blocksize=self._chunk,
callback=self._fill_buffer
)
self._stream.start()
return self
def __exit__(self, type, value, traceback) -> None:
"""Закрывает поток, независимо от того, было ли соединение потеряно или нет."""
self._stream.stop()
self._stream.close()
self.closed = True
self._buff.put(None)
def _fill_buffer(
self, in_data: np.ndarray, frames: int, time, status
) -> None:
"""Непрерывно собирает данные из аудиопотока в буфер.
Аргументы:
in_data: Аудиоданные в виде массива NumPy
frames: Число захваченных кадров
time: Временная информация
status: Флаги состояния
"""
self._buff.put(in_data.tobytes())
def generator(self):
while not self.closed:
# Используйте blocking get(), чтобы гарантировать наличие хотя бы одного чанка данных и остановить итерацию, если чанк None, что указывает на конец аудиопотока.
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b"".join(data)
async def start_client():
uri = "ws://localhost:8080"
async with connect(uri) as websocket:
with MicrophoneStream(RATE, CHUNK) as stream:
for chunk in stream.generator():
await websocket.send(chunk)
if __name__ == "__main__":
asyncio.run(start_client())
Проблема
Это непрерывно отправляет аудио с клиентской стороны, заполняя аудиобуфер, а метод _build_requests
создает асинхронный генератор, используемый методом SpeechAsyncClient.streaming_recognise
. Однако этот метод зависает и не доходит до строки print(“Готово”) и никогда не вызывает генератор _read_audio
. Есть ли проблема в том, как я использовал asyncio?
Ответ
Проанализируем вашу ситуацию и возможные причины, по которым метод SpeechAsyncClient.streaming_recognize
может зависать в вашем коде. По вашему описанию возникает несколько ключевых моментов, на которые стоит обратить внимание.
Возможные причины зависания:
-
Инициализация потока и асинхронного генератора: Убедитесь, что асинхронный генератор, предоставляемый методом
_read_audio
, правильно инициирует обработку запросов. На текущий момент в вашем коде метод_build_requests
запускается сначала вasyncio.create_task
, и это может привести к тому, что его выполнение начнёт происходить асинхронно до того, как будет введен поток аудио. -
Проблемы с очередью: Если
self._audio_queue
не будет заполняться данными, метод_read_audio
не сможет извлечь данные для передачи вSpeechAsyncClient.streaming_recognize
. Убедитесь, что аудиоданные действительно поступают и заполняют очереди. -
Закрытие потока: Убедитесь, что когда вы заканчиваете работу с аудиопотоком (например, завершение передачи аудио через WebSocket), вы корректно закрываете поток, отправляя
None
в_audio_queue
в методе_handler
. - Отсутствие аудиоданных: Если ваш клиент не отправляет аудиоданные, сервер не сможет обработать их и будет ждать поступления аудио, что может привести к зависанию.
Рекомендации по улучшению:
-
Проверка заполненности очереди: Добавьте отладочную информацию внутри метода
_read_audio
для проверки того, что данные действительно поступают в очередь. Например, вы можете устанавливать печать при получении каждого чека. Также проверьте, чтобы не было ситуации, когда данные не поступают в течение длительного времени. -
Упрощение работы с асинхронностью: Вы можете персонифицировать
_build_requests
и_handler
, чтобы лучше управлять асинхронной логикой. Переноситеasyncio.create_task
внутрь метода_handler
или запустите его до того, как начнете обрабатывать входящие сообщения. - Дополнительная обработка исключений: Оберните код, который взаимодействует с
SpeechAsyncClient
, в более тщательную обработку исключений, чтобы отловить и диагностировать наличие проблем с сетью или API на стороне Google.
Корректировка кода:
Вот несколько изменений, которые могут помочь:
async def _handler(self, websocket):
print("Connection")
self._is_streaming = True
asyncio.create_task(self._build_requests()) # Начинаем сбор запросов сразу
try:
async for message in websocket:
await self._audio_queue.put(message)
except Exception as e:
print(f"Failed: {e}")
finally:
await self._audio_queue.put(None) # Обязательно отправляем None для завершения
self._is_streaming = False
Попробуйте следовать этим рекомендациям и вносить соответствующие изменения в ваш исходный код. После этого проверьте, будет ли успешно работать метод streaming_recognize
, и устраните потенциальные блокировки.