Вопрос или проблема
Я экспериментирую с PyArrow, но у меня есть некоторые трудности в понимании некоторых моментов. Что я хочу достичь, так это читать большой файл (CSV в этом примере) частями и сохранять каждую часть в виде файла Parquet.
У меня есть следующий код:
import requests
import pyarrow as pa
def create_record_batches(lines, schema):
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
for line in lines:
if line:
newline = [{col_name: val for col_name, val in zip(schema.names, line.decode().split(","))}]
writer.write(pa.RecordBatch.from_pylist(newline))
else:
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()
# writer.close()
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
writer.close()
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()
with requests.get(
url="https://data.cityofnewyork.us/api/views/kxp8-n2sj/rows.csv?accessType=DOWNLOAD",
stream=True
) as response:
response.raise_for_status()
lines = response.iter_lines(chunk_size=128*1024*1024, delimiter=b"\n")
schema_str = next(lines).decode()
schema = pa.schema([ (col_name, pa.string()) for col_name in schema_str.split(",") ])
buffer = create_record_batches(lines, schema)
counter = 0
for buf in buffer:
if counter == 3:
break
# print(buf)
print("*")
print("**********")
print("**********")
counter += 1
Я еще не дошел до стадии записи чего-либо в файл Parquet, но позвольте мне разобрать свой код и также подробно объяснить, что мне неясно.
- Я использую
requests.get(url, stream=True)
, чтобы получить Response, и используюresponse.iter_lines(chunk_size=128*1024*1024)
, чтобы иметь возможность работать с частями по 128 МБ. - Поскольку первая строка файла является заголовком, я использую следующее, чтобы создать схему Pyarrow:
schema_str = next(lines).decode()
schema = pa.schema([ (col_name, pa.string()) for col_name in schema_str.split(",") ])
- С оставшимися
lines
я вызываю функциюcreate_record_batches(lines, schema)
. Эта функция отвечает за итерацию по каждой строке каждой части, чтобы создать RecordBatch и записать его в объектRecordBatchStreamWriter
.- В конце части, представленной пустой строкой, такой как
b''
, мы переходим в случайelse
, и я возвращаю байты из своего sink, так что он должен вернуть все RecordBatch конкретной части. - Все еще в случае
else
, я создаю совершенно новый sink и writer, моя цель здесь – очистить как sink, так и writer. Я пытался использоватьsink.truncate()
иsink.seek(0)
, но не смог этого сделать, потому чтоsink.getvalue()
закрывает sink. - Наконец, я закрываю writer и возвращаю оставшееся содержимое в sink, но я не знаю, имеет ли это смысл, учитывая, что я использую
iter_lines()
..?
- В конце части, представленной пустой строкой, такой как
Что я не понимаю в моем текущем коде, так это то, что когда я проверяю потребление памяти (используя htop) своей программы во время отладки и имея точку останова на каждом объекте buf
, я ожидал, что память будет колебаться вокруг 128 МБ, так как это размер моих частей, но память увеличивается, так что я что-то делаю не так, но не знаю, что. Мне кажется, это происходит потому, что sink и writer не действительно “сбрасываются”..
Что касается записи в файлы Parquet, то в моем плане использовать каждый объект buf
, чтобы создать PyArrow.Table, а затем записать каждую таблицу в файл Parquet. Чтобы создать PyArrow.Table, я думаю, что я буду использовать следующее:
import pyarrow.parquet as pq
...
for i_chunk, buf in enumerate(buffer):
table = pa.ipc.open_stream(pa.py_buffer(buf)).read_all()
file_path = f"some_file_name_chunk{i_chunk}.parquet"
pq.write_table(table, file_path)
Я иду в правильном направлении с моим текущим кодом? И также с тем, что я планирую делать, чтобы записать свои части в файлы Parquet?
Большое спасибо, если вы можете помочь мне лучше освоить библиотеку PyArrow!
PS: Я решил не использовать такие библиотеки, как pandas или csv, моя цель – попробовать манипулировать только буферами и потоковыми объектами, если это возможно.
Ответ или решение
Код, который вы предоставили, в целом идет в правильном направлении, но требует некоторых улучшений и уточнений. Давайте разберем ваши опасения и предоставим более точные рекомендации для достижения вашей цели.
1. Чтение больших файлов по частям
Использование requests.get(url, stream=True)
и response.iter_lines(chunk_size=128*1024*1024)
— подходящий способ для работы с большими файлами. Тем не менее, стоит уточнить, что iter_lines
возвращает строки, а размер chunk_size
здесь не влияет на память, поскольку он определяет только размер блока данных, из которого будут извлекаться строки.
2. Создание схемы PyArrow
Вы создали схему правильно, используя первую строку (заголовок) файла. Убедитесь, что типы данных в вашем случае правильные. Если в вашем CSV есть другие типы данных (например, целые числа или даты), их следует указать в схеме.
3. Проверка и улучшение функции create_record_batches
Функция create_record_batches
требует небольших улучшений. Вместо хранения временных файлов и выполняемых операций с ними, будет лучше сразу обрабатывать и записывать каждую порцию данных в Parquet. Также необходимо убедиться, что вы корректно обрабатываете конец данных.
Вот пример, как модифицировать вашу функцию:
def create_record_batches(lines, schema):
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
for line in lines:
if line:
row = [{col_name: val for col_name, val in zip(schema.names, line.decode().split(","))}]
writer.write(pa.RecordBatch.from_pylist(row))
else:
# Save and yield the buffer if there's an empty line
writer.close()
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()
# Reset the writer and sink for the next chunk
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
# Handle the last batch
writer.close()
if sink.size() > 0:
yield sink.getvalue().to_pybytes()
4. Запись в Parquet
Ваш код для сохранения данных в Parquet выглядит хорошо. Использование pa.ipc.open_stream()
для чтения потоков данных и преобразование их в таблицы PyArrow является правильным подходом. Этот код:
for i_chunk, buf in enumerate(buffer):
table = pa.ipc.open_stream(pa.py_buffer(buf)).read_all()
file_path = f"some_file_name_chunk{i_chunk}.parquet"
pq.write_table(table, file_path)
Общие принципы следующие:
- В каждом цикле вы читаете из буфера и создаете таблицу.
- Название файла с инкрементом для каждого чанка позволяет избежать перезаписи.
5. Устойчивость к потреблению памяти
Ваши опасения по поводу потребления памяти по мере выполнения программы целиком оправданы. Убедитесь, что вы закрываете все временные объекты и избегаете утечек памяти. В вашем случае, закрытие writer после записи каждой порции данных и обнуление sink должно помочь контролировать использование памяти.
Заключение
Ваш подход к работе с PyArrow и чтению CSV файлов по частям в целом правильный. Убедитесь в том, что вы правильно обрабатываете типы данных и ведете учет использования памяти. Задуманная вами схема записи данных в Parquet также выглядит очень разумно. Продолжайте экспериментировать и изучать возможности PyArrow, и вы достигнете поставленной цели. Если возникнут дополнительные вопросы, всегда рад помочь!