Пул потоков Python зависает приложение

Вопрос или проблема

В настоящее время я запускаю следующий скрипт на Python:

# Функция для обработки MMR поиска
def process_mmr_search(row, itemdesc):
    try:
        formatted_itemdesc = str(row[itemdesc])
        print('отформатированное itemdesc mmr', formatted_itemdesc)
        docs = indexed_taxonomy_described_cleaned.max_marginal_relevance_search(formatted_itemdesc, 20)
        print('документы mmr', docs)
        return [doc.page_content for doc in docs]
    except Exception as e:
        print(f"Ошибка в MMR поиске: {e}")
        return []

# Функция для управления потоками для MMR поиска
def threaded_mmr_search(index, row, itemdesc):
    mmr_matches = process_mmr_search(row, itemdesc)
    return index, mmr_matches


# Запуск MMR поиска с использованием потоков
with ThreadPoolExecutor(max_workers=4) as executor:  # Настройте max_workers в зависимости от доступных ресурсов
    future_mmr = {executor.submit(threaded_mmr_search, index, row, 'Описание материала'): index for index, row in spend_sheet_uniques.iterrows()}
    
    for future in as_completed(future_mmr):
        index, mmr_matches = future.result()
        spend_sheet_uniques.at[index, 'Лучшие_Совпадения_MMR'] = str(mmr_matches)

Цель: spend_sheet_uniques – это датафрейм, вся логика заключается в выполнении поиска схожести для каждой строки этого датафрейма, эмбеддинг – FAISS.

Проблема: После выполнения некоторых строк приложение просто зависает и не продвигается вперед, нет конкретной строки, на которой оно останавливается, это разное в разное время, редко оно обрабатывает все строки.

С вашей пулом потоков все в порядке, это похоже на проблему взаимной блокировки. Чтобы принудительно изолировать задачи, вы можете попробовать использовать ProcessPoolExecutor вместо:

if __name__ == "__main__":
    with ProcessPoolExecutor(...

Ответ или решение

Проблема зависания приложения при использовании пула потоков в Python

При выполнении многопоточной обработки данных в Python, вы столкнулись с проблемой зависания приложения. Давайте подробно рассмотрим, как это происходит, а также предложим решения для устранения этой проблемы.

Описание проблемы

Ваша программа использует ThreadPoolExecutor для обработки данных в многопоточном режиме, где для каждой строки DataFrame spend_sheet_uniques выполняется поиск с помощью функции process_mmr_search. Проблема заключается в том, что иногда приложение зависает и не завершает выполнение, что может стать причиной значительных задержек и потери производительности.

Проблема, с которой вы столкнулись, вероятнее всего, связана с взаимоблокировкой (deadlock) потоков. Это может быть вызвано несколькими факторами, такими как:

  1. Мьютексы и блокировки: Если внутри indexed_taxonomy_described_cleaned.max_marginal_relevance_search() используются блокировки (например, мьютексы), это может привести к зависанию потоков, ожидающих освобождения ресурса.

  2. Потоки блокируются при выполнении: Если один поток выполняет долгую операцию (например, долгий запрос к базе данных или сеть), других потоков может не хватить для завершения работы возвращающих функции.

  3. Ограниченное количество системных ресурсов: Если на системе имеется ограниченное количество системных ресурсов (например, память или количество потоков), это может привести к зависанию программы.

Решение проблемы

Чтобы устранить зависание и улучшить надежность программы, рекомендуем следующее:

  1. Использование ProcessPoolExecutor вместо ThreadPoolExecutor:

    Поскольку ThreadPoolExecutor использует потоки, которые могут конкурировать за глобальную блокировку интерпретатора Python (GIL), это может быть причиной зависания, особенно если работа включает в себя операции, потребляющие много времени на CPU. Попробуйте заменить его на ProcessPoolExecutor, который работает с процессами и не подвержен GIL.

    from concurrent.futures import ProcessPoolExecutor
    
    if __name__ == "__main__":
       with ProcessPoolExecutor(max_workers=4) as executor:
           future_mmr = {executor.submit(threaded_mmr_search, index, row, 'Material Description'): index for index, row in spend_sheet_uniques.iterrows()}
    
           for future in as_completed(future_mmr):
               index, mmr_matches = future.result()
               spend_sheet_uniques.at[index, 'Best_Matches_MMR'] = str(mmr_matches)
  2. Отладка и логирование: Добавьте больше логирования в ваш код, чтобы лучше понимать, на каком этапе происходит зависание. Это поможет вам локализовать проблему. Используйте logging вместо простого print для более структурированного вывода.

  3. Уменьшение рабочей нагрузки: Если задача выполняется долго, попробуйте уменьшить количество строк, обрабатываемых за раз, или распределите задачу на меньшие подзадачи.

  4. Тестирование и профилирование: Запустите код в профилировщике (например, cProfile) для выявления узких мест. Проверьте, не блокируется ли выполнение на каком-либо этапе.

Заключение

Данная проблема зависания при использовании пула потоков может быть решена за счет смены логики выполнения с потоков на процессы, что в большинстве случаев является более эффективным подходом, особенно для задач, требующих параллельного выполнения CPU-вместительных операций. Следуя предложенным рекомендациям, вы сможете улучшить надежность вашего приложения и избежать зависания во время выполнения.

Оцените материал
Добавить комментарий

Капча загружается...