Вопрос или проблема
В настоящее время я запускаю следующий скрипт на Python:
# Функция для обработки MMR поиска
def process_mmr_search(row, itemdesc):
try:
formatted_itemdesc = str(row[itemdesc])
print('отформатированное itemdesc mmr', formatted_itemdesc)
docs = indexed_taxonomy_described_cleaned.max_marginal_relevance_search(formatted_itemdesc, 20)
print('документы mmr', docs)
return [doc.page_content for doc in docs]
except Exception as e:
print(f"Ошибка в MMR поиске: {e}")
return []
# Функция для управления потоками для MMR поиска
def threaded_mmr_search(index, row, itemdesc):
mmr_matches = process_mmr_search(row, itemdesc)
return index, mmr_matches
# Запуск MMR поиска с использованием потоков
with ThreadPoolExecutor(max_workers=4) as executor: # Настройте max_workers в зависимости от доступных ресурсов
future_mmr = {executor.submit(threaded_mmr_search, index, row, 'Описание материала'): index for index, row in spend_sheet_uniques.iterrows()}
for future in as_completed(future_mmr):
index, mmr_matches = future.result()
spend_sheet_uniques.at[index, 'Лучшие_Совпадения_MMR'] = str(mmr_matches)
Цель: spend_sheet_uniques – это датафрейм, вся логика заключается в выполнении поиска схожести для каждой строки этого датафрейма, эмбеддинг – FAISS.
Проблема: После выполнения некоторых строк приложение просто зависает и не продвигается вперед, нет конкретной строки, на которой оно останавливается, это разное в разное время, редко оно обрабатывает все строки.
С вашей пулом потоков все в порядке, это похоже на проблему взаимной блокировки. Чтобы принудительно изолировать задачи, вы можете попробовать использовать ProcessPoolExecutor
вместо:
if __name__ == "__main__":
with ProcessPoolExecutor(...
Ответ или решение
Проблема зависания приложения при использовании пула потоков в Python
При выполнении многопоточной обработки данных в Python, вы столкнулись с проблемой зависания приложения. Давайте подробно рассмотрим, как это происходит, а также предложим решения для устранения этой проблемы.
Описание проблемы
Ваша программа использует ThreadPoolExecutor
для обработки данных в многопоточном режиме, где для каждой строки DataFrame spend_sheet_uniques
выполняется поиск с помощью функции process_mmr_search
. Проблема заключается в том, что иногда приложение зависает и не завершает выполнение, что может стать причиной значительных задержек и потери производительности.
Проблема, с которой вы столкнулись, вероятнее всего, связана с взаимоблокировкой (deadlock) потоков. Это может быть вызвано несколькими факторами, такими как:
-
Мьютексы и блокировки: Если внутри
indexed_taxonomy_described_cleaned.max_marginal_relevance_search()
используются блокировки (например, мьютексы), это может привести к зависанию потоков, ожидающих освобождения ресурса. -
Потоки блокируются при выполнении: Если один поток выполняет долгую операцию (например, долгий запрос к базе данных или сеть), других потоков может не хватить для завершения работы возвращающих функции.
-
Ограниченное количество системных ресурсов: Если на системе имеется ограниченное количество системных ресурсов (например, память или количество потоков), это может привести к зависанию программы.
Решение проблемы
Чтобы устранить зависание и улучшить надежность программы, рекомендуем следующее:
-
Использование
ProcessPoolExecutor
вместоThreadPoolExecutor
:Поскольку
ThreadPoolExecutor
использует потоки, которые могут конкурировать за глобальную блокировку интерпретатора Python (GIL), это может быть причиной зависания, особенно если работа включает в себя операции, потребляющие много времени на CPU. Попробуйте заменить его наProcessPoolExecutor
, который работает с процессами и не подвержен GIL.from concurrent.futures import ProcessPoolExecutor if __name__ == "__main__": with ProcessPoolExecutor(max_workers=4) as executor: future_mmr = {executor.submit(threaded_mmr_search, index, row, 'Material Description'): index for index, row in spend_sheet_uniques.iterrows()} for future in as_completed(future_mmr): index, mmr_matches = future.result() spend_sheet_uniques.at[index, 'Best_Matches_MMR'] = str(mmr_matches)
-
Отладка и логирование: Добавьте больше логирования в ваш код, чтобы лучше понимать, на каком этапе происходит зависание. Это поможет вам локализовать проблему. Используйте
logging
вместо простогоprint
для более структурированного вывода. -
Уменьшение рабочей нагрузки: Если задача выполняется долго, попробуйте уменьшить количество строк, обрабатываемых за раз, или распределите задачу на меньшие подзадачи.
-
Тестирование и профилирование: Запустите код в профилировщике (например,
cProfile
) для выявления узких мест. Проверьте, не блокируется ли выполнение на каком-либо этапе.
Заключение
Данная проблема зависания при использовании пула потоков может быть решена за счет смены логики выполнения с потоков на процессы, что в большинстве случаев является более эффективным подходом, особенно для задач, требующих параллельного выполнения CPU-вместительных операций. Следуя предложенным рекомендациям, вы сможете улучшить надежность вашего приложения и избежать зависания во время выполнения.