Как асинхронно передавать данные из файла через HTTP-сервер на Rust?

Вопрос или проблема

Я хочу передавать HTTP поток из Rust бэкенда, чтобы использовать его как src в HTML теге Audio на фронтенде Tauri.

Я решил использовать warp и вот что у меня получилось:

use futures::future::FutureExt;
use futures::pin_mut;
use hyper::body::{Body, Frame};
use std::collections::VecDeque;
use std::fs::File;
use std::pin::Pin;
use std::task::{Context, Poll};
use tauri::AppHandle;
use warp::{http::Response, Filter};

struct FileAt {
    file: File,
    at: usize,
}

impl FileAt {
    pub async fn read_part(&mut self) -> Result<VecDeque<u8>, String> {
        // TODO: асинхронно читать данные из *file*, начиная с *at*
        // TODO: добавить количество прочитанных байтов к *at*
        Ok(VecDeque::new())
    }
}

impl Body for FileAt {
    type Data = VecDeque<u8>;
    type Error = String;

    fn poll_frame(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
        // TODO: трейлеры? (метаданные файла -> заголовки)
        if !self.is_end_stream() {
            let f = self.get_mut().read_part();
            pin_mut!(f);
            if let Poll::Ready(out) = f.poll_unpin(cx) {
                Poll::Ready(
                    out.map(|data| {
                        if data.is_empty() {
                            Some(Frame::data(data))
                        } else {
                            None
                        }
                    })
                    .transpose(),
                )
            } else {
                Poll::Pending
            }
        } else {
            Poll::Ready(None)
        }
    }
}

fn data(path: String) -> Result<FileAt, String> {
    let body = FileAt {
        file: File::open(path).or(Err("Ошибка открытия файла.".to_string()))?,
        at: 0,
    };
    Ok(body)
}

tauri::async_runtime::spawn(async {
    warp::serve(warp::path::param::<String>().map(|path| {
        Response::new(data(path).unwrap())
    }))
    .run(([127, 0, 0, 1], 12345))
    .await;
});

Но я столкнулся с привычным хаосом системы типов Rust, ограничение трейта 'warp::hyper::Body: From<FileAt>' не выполнено – это ближайшее, что я смог получить к решению с приведенным кодом. Как я могу исправить код? У меня закончились идеи. Спасибо!

Я не хочу просто использовать warp::filters::fs, потому что получение данных из файла является лишь временной мерой, позже данные будут асинхронно извлечены из другого источника, я планирую заменить FileAt на другую реализацию hyper::Body.

Возможно, вы заметите какие-то другие ошибки, с которыми я могу столкнуться позже, это тоже приветствуется, но пока мне нужна помощь именно с вышеизложенным.

Ответ или решение

Чтобы асинхронно стримить данные из файла через HTTP-сервер на Rust с использованием warp, необходимо правильно реализовать структуру, которая будет отвечать за чтение данных из файла, а также обеспечить корректную реализацию трейт-интерфейса hyper::Body. Ниже приведено исправленное решение с детальным объяснением.

Исправленный код

use futures::Stream;
use std::collections::VecDeque;
use std::fs::File;
use std::io::{self, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use warp::{http::Response, Filter};

// Структура для чтения файла частями
struct FileStream {
    file: File,
    buffer: VecDeque<u8>,
    end_of_file: bool,
}

impl FileStream {
    pub fn new(path: &str) -> io::Result<FileStream> {
        let file = File::open(path)?;
        Ok(FileStream {
            file,
            buffer: VecDeque::new(),
            end_of_file: false,
        })
    }

    // Метод для чтения данных из файла
    pub fn fill_buffer(&mut self) -> io::Result<()> {
        if self.end_of_file {
            return Ok(());
        }

        let mut temp_buffer = vec![0; 1024]; // Читаем 1 КБ за раз
        let bytes_read = self.file.read(&mut temp_buffer)?;

        if bytes_read == 0 {
            self.end_of_file = true; // Достигнут конец файла
        } else {
            self.buffer.extend(temp_buffer.into_iter().take(bytes_read));
        }

        Ok(())
    }
}

// Реализация Stream для FileStream
impl Stream for FileStream {
    type Item = Result<bytes::Bytes, warp::Error>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        this.fill_buffer().map_err(|e| warp::Error::from(e))?;

        if this.buffer.is_empty() && *this.end_of_file {
            return Poll::Ready(None); // Конец потока
        } else if this.buffer.is_empty() {
            // Если буфер пуст, нужно ожидать
            cx.waker().wake_by_ref();
            return Poll::Pending;
        }

        let byte_chunk = this.buffer.pop_front().expect("Buffer should not be empty");
        Poll::Ready(Some(Ok(bytes::Bytes::from_static(&[byte_chunk]))))
    }
}

// Warp фильтр для обработки запроса
async fn serve_file(path: String) -> Result<impl warp::Reply, warp::Rejection> {
    let file_stream = FileStream::new(&path).map_err(|e| warp::reject::custom(e))?;
    Ok(Response::new(file_stream))
}

#[tokio::main]
async fn main() {
    let routes = warp::path::param()
        .and_then(serve_file); // Определяем маршруты

    warp::serve(routes)
        .run(([127, 0, 0, 1], 12345)) // Запускаем сервер
        .await;
}

Объяснение кода

  1. FileStream: Это структура, которая содержит открытый файл и буфер для данных. Она будет использоваться для последовательного чтения данных из файла.

  2. Метод fill_buffer: Этот метод читает данные из файла в буфер. Если достигнут конец файла, устанавливается соответствующий флаг.

  3. Stream: Реализуем трейт Stream для FileStream, который будет подавать данные клиенту. При каждом запросе метод poll_next будет проверять буфер. Если буфер пуст, он вызовет cx.waker().wake_by_ref() для ожидания следующего вызова.

  4. Фильтр serve_file: Этот асинхронный фильтр будет принимать путь к файлу, открывать его и передавать поток в ответе.

  5. Запуск сервера: Используем warp::serve и задаем маршруты, связывая HTTP-запросы и функции обработки.

Заключение

Этот код позволяет асинхронно стримить данные из файла через HTTP, что эффективно как для небольших файлов, так и для больших. Приведенная структура легко заменимая, что позволяет вам позже интегрировать другие источники данных, такие как базы данных или внешние API.

Оцените материал
Добавить комментарий

Капча загружается...