Проблемы с многопоточностью в Java

Вопрос или проблема

У меня есть длительная задача, выполняющая вычисленные симуляции, которую я решил запустить параллельно для повышения эффективности и объединения данных результатов. Недавно я добавил сложность в эту длительную задачу, и с тех пор внутри этойExpensiveTask возникают условия исключения, когда она запускается в нескольких потоках:

  • NullPointerException
  • ConcurrentModificationException (выбрасывается на объектах, доступных только из одного потока!)
    при доступе к внутренним структурам данных.

Вот она:

        //int NUMBER_OF_THREADS = 1;//работает идеально
        int NUMBER_OF_THREADS = 2;//2 и более вызывает внутренние исключения ExpensiveTask для некоторых потоков.
        ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
        CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
        // Общий контейнер для результатов
        ConcurrentMap<> globalSimDataMap = new ConcurrentHashMap<>();

        for (int i = 0; i < NUMBER_OF_THREADS; i++) {

            executor.submit(() -> {

                try {

                    System.out.println("Задача " + Thread.currentThread().getName());
                    ExpensiveTask expensiveTaskInstance=null;

                    try{
                        expensiveTaskInstance = new ExpensiveTask();
                    } catch(Exception e){
                        //Здесь программа выдает ошибку при реализации многопоточности
                        e.printStackTrace();
                        Thread.currentThread().interrupt();
                    }

                    // Получить результаты симуляции из этого потока
                    expensiveTaskInstance.getSimulationRecord();

                    // Аггрегировать результаты в глобальную карту
                    /*
                    * @TODO
                    */

                } finally {
                    latch.countDown();

                }
            });
        }

        try{
            
            latch.await();  // Ждать завершения всех потоков
            executor.shutdown();  // Остановить исполнитель
        } catch(Exception e){
            e.printStackTrace();
        }

Подчеркиваю, что ExpensiveTask корректно тестируется, и такие вещи никогда не происходят при запуске в одном потоке. На мой взгляд, эта реализация многопоточности сделана плохо, но я не в состоянии правильно отладить эту проблему.

Ответ или решение

Проблемы с многопоточностью в Java, особенно в контексте вашей задачи, могут быть вызваны несколькими факторами, связанными с состоянием объектов и доступом к данным. Давайте разберём основные моменты и шаги, которые помогут решить возникшие ошибки NullPointerException и ConcurrentModificationException.

1. NullPointerException

Ошибка NullPointerException может возникать, если объект ExpensiveTask не инициализирован должным образом. Например, если во время инициализации происходит ошибка, объект может остаться равным null, и попытка обращения к его методам приведёт к исключению. Поскольку у вас есть try-catch блок, убедитесь, что вы обрабатываете исключения корректно и после возникновения исключения не используете expensiveTaskInstance.

2. ConcurrentModificationException

Эта ошибка обычно появляется, когда коллекция изменяется во время её итерации. В вашем случае необходимо убедиться, что любые структуры данных, к которым у вас есть доступ, защищены от конкурирующих изменений, если они используются несколькими потоками. Поскольку вы используете ConcurrentHashMap, он должен быть потокобезопасным, но если есть другие коллекции, которые могут изменяться, их также следует защищать.

Рекомендации по коду

  1. Проверка инициализации объекта: Убедитесь, что expensiveTaskInstance инициализирован корректно перед его использованием.

  2. Избегайте изменения коллекций во время итерации: Если в ExpensiveTask имеются внутренние структуры данных, которые изменяются во время выполнения (например, List, Map и т.д.), используйте синхронизацию или замените их на потокобезопасные варианты.

  3. Проверьте методы класса ExpensiveTask: Убедитесь, что доступ к общим данным и самим обработкам внутри ExpensiveTask также потокобезопасен.

Пример исправленного кода

Ниже приведён упрощённый пример исправленного кода:

ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
ConcurrentMap<String, String> globalSimDataMap = new ConcurrentHashMap<>();

for (int i = 0; i < NUMBER_OF_THREADS; i++) {
    executor.submit(() -> {
        ExpensiveTask expensiveTaskInstance = null;

        try {
            System.out.println("Task " + Thread.currentThread().getName());
            expensiveTaskInstance = new ExpensiveTask();

            // Получение результатов симуляции
            expensiveTaskInstance.getSimulationRecord();

            // Объединение результатов в общий массив
            // Используйте synchronized или одно из других средств синхронизации,
            // если доступ к общим данным не защищен.
            synchronized (globalSimDataMap) {
                // Добавьте результаты в глобальную карту
            }

        } catch (NullPointerException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        } catch (ConcurrentModificationException e) {
            e.printStackTrace(); // Логгирование про ConcurrentModificationException
        } finally {
            latch.countDown();
        }
    });
}

try {
    latch.await();  // Ждет завершения всех потоков
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    executor.shutdown();  // Завершение работы экзекьютора
}

Заключение

Проблемы многопоточности часто требуют тщательной проверки логики работы с объектами и данными. Обязательно уделите внимание тому, как ваши потоки взаимодействуют с общими ресурсами, и используйте подходящие меры защиты для обеспечения корректной работы программы.Если же вы продолжаете сталкиваться с трудностями, рассмотрите возможность добавления детального логирования для отслеживания путей выполнения вашего кода в многопоточном контексте.

Оцените материал
Добавить комментарий

Капча загружается...