Вопрос или проблема
Я хочу передавать HTTP поток из Rust бэкенда, чтобы использовать его как src в HTML теге Audio на фронтенде Tauri.
Я решил использовать warp и вот что у меня получилось:
use futures::future::FutureExt;
use futures::pin_mut;
use hyper::body::{Body, Frame};
use std::collections::VecDeque;
use std::fs::File;
use std::pin::Pin;
use std::task::{Context, Poll};
use tauri::AppHandle;
use warp::{http::Response, Filter};
struct FileAt {
file: File,
at: usize,
}
impl FileAt {
pub async fn read_part(&mut self) -> Result<VecDeque<u8>, String> {
// TODO: асинхронно читать данные из *file*, начиная с *at*
// TODO: добавить количество прочитанных байтов к *at*
Ok(VecDeque::new())
}
}
impl Body for FileAt {
type Data = VecDeque<u8>;
type Error = String;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
// TODO: трейлеры? (метаданные файла -> заголовки)
if !self.is_end_stream() {
let f = self.get_mut().read_part();
pin_mut!(f);
if let Poll::Ready(out) = f.poll_unpin(cx) {
Poll::Ready(
out.map(|data| {
if data.is_empty() {
Some(Frame::data(data))
} else {
None
}
})
.transpose(),
)
} else {
Poll::Pending
}
} else {
Poll::Ready(None)
}
}
}
fn data(path: String) -> Result<FileAt, String> {
let body = FileAt {
file: File::open(path).or(Err("Ошибка открытия файла.".to_string()))?,
at: 0,
};
Ok(body)
}
tauri::async_runtime::spawn(async {
warp::serve(warp::path::param::<String>().map(|path| {
Response::new(data(path).unwrap())
}))
.run(([127, 0, 0, 1], 12345))
.await;
});
Но я столкнулся с привычным хаосом системы типов Rust, ограничение трейта 'warp::hyper::Body: From<FileAt>' не выполнено
– это ближайшее, что я смог получить к решению с приведенным кодом. Как я могу исправить код? У меня закончились идеи. Спасибо!
Я не хочу просто использовать warp::filters::fs
, потому что получение данных из файла является лишь временной мерой, позже данные будут асинхронно извлечены из другого источника, я планирую заменить FileAt
на другую реализацию hyper::Body
.
Возможно, вы заметите какие-то другие ошибки, с которыми я могу столкнуться позже, это тоже приветствуется, но пока мне нужна помощь именно с вышеизложенным.
Ответ или решение
Чтобы асинхронно стримить данные из файла через HTTP-сервер на Rust с использованием warp
, необходимо правильно реализовать структуру, которая будет отвечать за чтение данных из файла, а также обеспечить корректную реализацию трейт-интерфейса hyper::Body
. Ниже приведено исправленное решение с детальным объяснением.
Исправленный код
use futures::Stream;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use warp::{http::Response, Filter};
// Структура для чтения файла частями
struct FileStream {
file: File,
buffer: VecDeque<u8>,
end_of_file: bool,
}
impl FileStream {
pub fn new(path: &str) -> io::Result<FileStream> {
let file = File::open(path)?;
Ok(FileStream {
file,
buffer: VecDeque::new(),
end_of_file: false,
})
}
// Метод для чтения данных из файла
pub fn fill_buffer(&mut self) -> io::Result<()> {
if self.end_of_file {
return Ok(());
}
let mut temp_buffer = vec![0; 1024]; // Читаем 1 КБ за раз
let bytes_read = self.file.read(&mut temp_buffer)?;
if bytes_read == 0 {
self.end_of_file = true; // Достигнут конец файла
} else {
self.buffer.extend(temp_buffer.into_iter().take(bytes_read));
}
Ok(())
}
}
// Реализация Stream для FileStream
impl Stream for FileStream {
type Item = Result<bytes::Bytes, warp::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
this.fill_buffer().map_err(|e| warp::Error::from(e))?;
if this.buffer.is_empty() && *this.end_of_file {
return Poll::Ready(None); // Конец потока
} else if this.buffer.is_empty() {
// Если буфер пуст, нужно ожидать
cx.waker().wake_by_ref();
return Poll::Pending;
}
let byte_chunk = this.buffer.pop_front().expect("Buffer should not be empty");
Poll::Ready(Some(Ok(bytes::Bytes::from_static(&[byte_chunk]))))
}
}
// Warp фильтр для обработки запроса
async fn serve_file(path: String) -> Result<impl warp::Reply, warp::Rejection> {
let file_stream = FileStream::new(&path).map_err(|e| warp::reject::custom(e))?;
Ok(Response::new(file_stream))
}
#[tokio::main]
async fn main() {
let routes = warp::path::param()
.and_then(serve_file); // Определяем маршруты
warp::serve(routes)
.run(([127, 0, 0, 1], 12345)) // Запускаем сервер
.await;
}
Объяснение кода
-
FileStream
: Это структура, которая содержит открытый файл и буфер для данных. Она будет использоваться для последовательного чтения данных из файла. -
Метод
fill_buffer
: Этот метод читает данные из файла в буфер. Если достигнут конец файла, устанавливается соответствующий флаг. -
Stream
: Реализуем трейтStream
дляFileStream
, который будет подавать данные клиенту. При каждом запросе методpoll_next
будет проверять буфер. Если буфер пуст, он вызоветcx.waker().wake_by_ref()
для ожидания следующего вызова. -
Фильтр
serve_file
: Этот асинхронный фильтр будет принимать путь к файлу, открывать его и передавать поток в ответе. -
Запуск сервера: Используем
warp::serve
и задаем маршруты, связывая HTTP-запросы и функции обработки.
Заключение
Этот код позволяет асинхронно стримить данные из файла через HTTP, что эффективно как для небольших файлов, так и для больших. Приведенная структура легко заменимая, что позволяет вам позже интегрировать другие источники данных, такие как базы данных или внешние API.