Вопрос или проблема
Я бы хотел использовать параллельную обработку, чтобы ускорить программу. Мой входной файл содержит 30 миллионов строк. Я хотел бы запустить 10 процессов параллельно. Но мой файл очень большой (содержит 30 миллионов строк). Я беспокоюсь, что объем используемой памяти станет слишком большим. Кто-нибудь знает, что мне делать? Ниже приведен код, который выполняется в одном процессе. Спасибо заранее.
batch_size = 1000
header_written = False
output_file = "final_patent_sample.csv"
for i in tqdm(range(0, df2_csv.shape[0], batch_size)):
batch = df2_csv.iloc[i:i + batch_size].to_dict(orient="records")
results = match_and_merge(batch)
batch_df = pd.DataFrame(results, columns=matched_df.columns)
batch_df.to_csv(output_file, mode="a", index=False, header=not header_written)
header_written = True
Ответ или решение
Мультипроцессорная обработка с использованием пакетирования данных
Вопрос, который вы подняли, касается оптимизации обработки большого объема данных с использованием мультипроцессинга. При наличии 30 миллионов строк в вашем файле важно найти баланс между эффективностью обработки и управлением объемом используемой оперативной памяти. Ниже я предлагаю решения, которые помогут вам внедрить мультипроцессинг с пакетной обработкой без риска переполнения памяти.
1. Понимание задачи
Вы хотите разбить процесс обработки на 10 параллельных потоков, чтобы ускорить обработку данных. Ваша исходная схема обработки данных включает чтение данных порциями по 1000 строк, обработку этих строк функцией match_and_merge
, а затем запись результатов в выходной файл. Это базовый процесс, который требует оптимизации для работы с высоким объемом данных.
2. Оптимальная реализация мультипроцессинга
Для последующей реализации я рекомендую использовать библиотеку multiprocessing
в Python. Этот подход позволит распараллелить обработку данных, при этом контролируя нагрузку на память с помощью пакетирования.
import pandas as pd
import numpy as np
from multiprocessing import Pool
import os
def process_batch(batch):
results = match_and_merge(batch)
return pd.DataFrame(results)
def write_to_file(batch_df, output_file):
batch_df.to_csv(output_file, mode="a", index=False, header=os.path.exists(output_file) is False)
def main():
batch_size = 1000
output_file = "final_patent_sample.csv"
# Читаем данные из вашего CSV файла
df = pd.read_csv('your_input_file.csv')
# Запускаем процессы
with Pool(processes=10) as pool: # Укажите количество процессов
for i in tqdm(range(0, df.shape[0], batch_size)):
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
# Отправляем пакет на обработку
result = pool.apply_async(process_batch, (batch,))
batch_df = result.get() # Получаем DataFrame из результата
write_to_file(batch_df, output_file)
if __name__ == "__main__":
main()
3. Объяснение кода
-
Пакетирование: Обработка данных происходит по частям (пакетам) по 1000 строк, что уменьшает нагрузку на память.
-
Мультипроцессинг: Вызов
Pool
создает 10 процессов, которые параллельно обрабатывают данные. Методapply_async
позволяет асинхронно отправлять данные на обработку, что повышает производительность. -
Запись в файл: Результаты записываются в выходной файл после обработки каждого пакета. Мы используем параметр
header=os.path.exists(output_file) is False
, чтобы избежать дублирования заголовка.
4. Избежание переполнения памяти
-
Контроль объема памяти: Каждый поток обрабатывает только один пакет данных, что минимизирует пиковую загрузку памяти. Это важный аспект, особенно если у вас ограниченные ресурсы.
-
Эффективное использование CPU: При правильном распределении данных и использовании процессов вы можете значительно увеличить throughput вашего приложения.
5. Заключение
Используя предложенное решение с пакетированием и мультипроцессингом, вы сможете эффективно обработать 30 миллионов строк данных, сохранив контроль над использованием памяти. Подход, описанный выше, минимизирует риски переполнения памяти и оптимизирует время выполнения приложения.
Помните, что каждая функция и шаг в процессе обработки данных должны быть тщательно протестированы, чтобы убедиться в их правильности и эффективности в условиях многопоточности. Удачи в вашей работе!