Правильный способ работы с наборами данных, превышающими память, в Polars

Вопрос или проблема

Правильный способ работы с наборами данных, превышающими память, в Polars

Я начал изучать и применять Polars из-за (1) потенциального повышения скорости и (2) обещания возможности обработки наборов данных, превышающих объем памяти. Однако мне трудно увидеть, как второе обещание действительно выполняется в конкретных сценариях, которые требуется для моего случая использования.

Один конкретный пример, который вызывает у меня затруднения, — это как прочитать много-Gb JSONL файл из S3, применить несколько преобразований и отправить измененные записи в STDOUT.

Пробелы в “ленивых” методах “sink”…

Как я только что упомянул на GitHub, методы sink_*() не поддерживают запись в буфер или файл — только в именованный путь к файлу. В противном случае, простым решением было бы что-то вроде sink_ndjson(sys.stdout, ...)

Нет четкого способа “пакетировать” DataFrame или LazyFrame в меньшие фреймы данных.

Следующее, что я попробовал, — это получить меньшие пакеты или фреймы данных (например, по 100К строк за раз), которые я мог бы обрабатывать в памяти и записывать с помощью write_ndjson(sys.stdout, ...) один за другим, пока не дойду до конца потока.

Ближе всего, что я смог найти, это LazyFrame.slice(offset, batch_size).collect() — за исключением того, что на практике это, похоже, зависает/выдает сбой при первом вызове, вместо того чтобы считать только первые n записей и продолжать. Даже когда я устанавливаю низкое число записей в схеме сканирования LazyFrame. Возможно, это ошибка — но даже в этом случае метод slice() не кажется специально разработанным для получения инкрементальных пакетов из ленивого фрейма.

Буду благодарен за любую помощь!

Ответ или решение

Для обработки больших наборов данных с использованием библиотеки Polars, особенно когда они превышают объем доступной оперативной памяти, необходимо придерживаться определенной стратегии. Ваша задача заключается в чтении многогигабайтного JSONL файла из S3, применении к данным нескольких преобразований и отправке измененных записей в стандартный вывод (STDOUT). Рассмотрим, как это можно сделать.

1. Чтение данных из S3

Для начала, вам нужно использовать scan_parquet или аналогичный метод для чтения данных напрямую из S3. Поларс в настоящее время позволяет работать с данными, не загружая их полностью в память. Пример кода может выглядеть следующим образом:

import polars as pl

# Укажите путь к вашему файлу на S3
s3_path = "s3://bucket-name/path/to/file.jsonl"

# Чтение данных из JSONL
lazy_df = pl.scan_json(s3_path)

2. Применение преобразований

Далее вы можете применять несколько трансформаций к LazyFrame, который был создан ранее. Не забудьте использовать ленивые операции, что позволит оптимизировать выполнение задач:

# Пример трансформаций
transformed_df = lazy_df.with_columns([
    (pl.col("column_name") + 1).alias("new_column"),  # Пример добавления нового столбца
    pl.col("another_column").filter(pl.col("another_column") > 0)  # Фильтрация
])

3. Пакетная обработка и запись в STDOUT

Для обработки данных порциями вы можете использовать метод slice(), чтобы извлекать данные небольшими партиями. Важно убедиться, что вы собираете данные в пакетах и допускаете временные задержки в работе с памятью. Пример кода может выглядеть следующим образом:

import sys

batch_size = 100000  # Определение размера батча
offset = 0

while True:
    # Получаем текущий батч данных
    batch_df = transformed_df.slice(offset, batch_size).collect()

    # Если размер собранного DataFrame равен 0, значит, нет больше данных
    if batch_df.is_empty():
        break

    # Запись данных в формате ndjson в стандартный вывод
    batch_df.write_ndjson(sys.stdout)

    # Увеличиваем смещение на размер батча
    offset += batch_size

Заключение

С помощью этой стратегии вы сможете обрабатывать большие наборы данных без необходимости загружать их полностью в память, используя возможности библиотеки Polars. Вы сможете эффективно читать данные из S3, обрабатывать их пакетами и записывать результаты в стандартный вывод. Убедитесь, что ваши преобразования являются ленивыми и оптимизированными, чтобы извлечь максимальную производительность из библиотеки Polars.

Оцените материал
Добавить комментарий

Капча загружается...