Вопрос или проблема
Правильный способ работы с наборами данных, превышающими память, в Polars
Я начал изучать и применять Polars из-за (1) потенциального повышения скорости и (2) обещания возможности обработки наборов данных, превышающих объем памяти. Однако мне трудно увидеть, как второе обещание действительно выполняется в конкретных сценариях, которые требуется для моего случая использования.
Один конкретный пример, который вызывает у меня затруднения, — это как прочитать много-Gb JSONL файл из S3, применить несколько преобразований и отправить измененные записи в STDOUT
.
Пробелы в “ленивых” методах “sink”…
Как я только что упомянул на GitHub, методы sink_*()
не поддерживают запись в буфер или файл — только в именованный путь к файлу. В противном случае, простым решением было бы что-то вроде sink_ndjson(sys.stdout, ...)
Нет четкого способа “пакетировать” DataFrame
или LazyFrame
в меньшие фреймы данных.
Следующее, что я попробовал, — это получить меньшие пакеты или фреймы данных (например, по 100К строк за раз), которые я мог бы обрабатывать в памяти и записывать с помощью write_ndjson(sys.stdout, ...)
один за другим, пока не дойду до конца потока.
Ближе всего, что я смог найти, это LazyFrame.slice(offset, batch_size).collect()
— за исключением того, что на практике это, похоже, зависает/выдает сбой при первом вызове, вместо того чтобы считать только первые n записей и продолжать. Даже когда я устанавливаю низкое число записей в схеме сканирования LazyFrame. Возможно, это ошибка — но даже в этом случае метод slice()
не кажется специально разработанным для получения инкрементальных пакетов из ленивого фрейма.
Буду благодарен за любую помощь!
Ответ или решение
Для обработки больших наборов данных с использованием библиотеки Polars, особенно когда они превышают объем доступной оперативной памяти, необходимо придерживаться определенной стратегии. Ваша задача заключается в чтении многогигабайтного JSONL файла из S3, применении к данным нескольких преобразований и отправке измененных записей в стандартный вывод (STDOUT). Рассмотрим, как это можно сделать.
1. Чтение данных из S3
Для начала, вам нужно использовать scan_parquet
или аналогичный метод для чтения данных напрямую из S3. Поларс в настоящее время позволяет работать с данными, не загружая их полностью в память. Пример кода может выглядеть следующим образом:
import polars as pl
# Укажите путь к вашему файлу на S3
s3_path = "s3://bucket-name/path/to/file.jsonl"
# Чтение данных из JSONL
lazy_df = pl.scan_json(s3_path)
2. Применение преобразований
Далее вы можете применять несколько трансформаций к LazyFrame
, который был создан ранее. Не забудьте использовать ленивые операции, что позволит оптимизировать выполнение задач:
# Пример трансформаций
transformed_df = lazy_df.with_columns([
(pl.col("column_name") + 1).alias("new_column"), # Пример добавления нового столбца
pl.col("another_column").filter(pl.col("another_column") > 0) # Фильтрация
])
3. Пакетная обработка и запись в STDOUT
Для обработки данных порциями вы можете использовать метод slice()
, чтобы извлекать данные небольшими партиями. Важно убедиться, что вы собираете данные в пакетах и допускаете временные задержки в работе с памятью. Пример кода может выглядеть следующим образом:
import sys
batch_size = 100000 # Определение размера батча
offset = 0
while True:
# Получаем текущий батч данных
batch_df = transformed_df.slice(offset, batch_size).collect()
# Если размер собранного DataFrame равен 0, значит, нет больше данных
if batch_df.is_empty():
break
# Запись данных в формате ndjson в стандартный вывод
batch_df.write_ndjson(sys.stdout)
# Увеличиваем смещение на размер батча
offset += batch_size
Заключение
С помощью этой стратегии вы сможете обрабатывать большие наборы данных без необходимости загружать их полностью в память, используя возможности библиотеки Polars. Вы сможете эффективно читать данные из S3, обрабатывать их пакетами и записывать результаты в стандартный вывод. Убедитесь, что ваши преобразования являются ленивыми и оптимизированными, чтобы извлечь максимальную производительность из библиотеки Polars.