Ошибки при развертывании модели PyTorch Lightning в AWS SageMaker TrainingJobs: SMDDP не поддерживает ReduceOp.

Вопрос или проблема

Я пытаюсь следовать рекомендациям DDP (Distributed Data Parallel) (Руководство 1, Руководство 2) и развернуть свои модели глубокого обучения на AWS SageMaker. Тем не менее, при запуске я сталкиваюсь со следующей ошибкой. [1 экземпляр, 4 GPU]

Могу я попросить вас подсказать, как я могу исправить эту ошибку? Буду признателен за любые предложения или комментарии!


Код модели:

from torch.distributed import init_process_group, destroy_process_group
from torchmetrics.functional import pairwise_cosine_similarity
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint
from lightning.pytorch.loggers import CSVLogger

import smdistributed.dataparallel.torch.torch_smddp
from lightning.pytorch.strategies import DDPStrategy
from lightning.fabric.plugins.environments.lightning import LightningEnvironment
import lightning as pl

env = LightningEnvironment()
env.world_size = lambda: int(os.environ["WORLD_SIZE"])
env.global_rank = lambda: int(os.environ["RANK"])

def main(args):

    train_samples = 1000 
    val_samples = 200 
    test_samples = 200 

    csv_logger = CSVLogger(save_dir=args.model_dir, name=args.modelname)

    # Инициализация DataModule
    data_module = ImagePairDataModule(
        data_save_folder=args.data_dir,
        train_samples=train_samples,
        val_samples=val_samples,
        test_samples=test_samples,
        batch_size=args.batch_size,
        num_workers=12,
    )

    # Инициализация модели
    model = Siamese()

    # Настройка обратного вызова для сохранения лучшей модели
    checkpoint_callback = ModelCheckpoint(
        monitor="val_loss",                 # Мониторить потерю на валидации
        dirpath=args.model_dir,             # Директория для сохранения контрольных точек модели
        filename="best-checkpoint-test",    # Имя файла контрольной точки
        save_top_k=1,                       # Сохранять только лучшую модель
        mode="min",                         # Сохранять при минимизации потерь на валидации
        save_on_train_epoch_end=False,
    )

    # Настройка ранней остановки, чтобы прекратить обучение, если потери на валидации не улучшаются
    early_stopping_callback = EarlyStopping(
        monitor="val_loss",                 # Мониторить потерю на валидации
        patience=args.patience,             # Количество эпох без улучшения перед остановкой
        mode="min",                         # Остановить, когда потери на валидации минимизируются
    )

    # Настройка ddp на SageMaker
    # https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel-modify-sdp-pt-lightning.html
    ddp = DDPStrategy(
        cluster_environment=env, 
        process_group_backend="smddp", 
        accelerator="gpu"
    )

    # Инициализация Trainer PyTorch Lightning
    trainer = pl.Trainer(
        max_epochs=args.epochs,
        strategy=ddp,                      # Стратегия распределённого параллельного обучения
        devices=torch.cuda.device_count(),    # Использовать все доступные GPU
        precision=16,                        # Использовать смешанную точность (16 бит)
        callbacks=[checkpoint_callback, early_stopping_callback],
        log_every_n_steps=10,
        logger=csv_logger,
    )

    # Обучение модели
    trainer.fit(model, datamodule=data_module)

    best_model_path = checkpoint_callback.best_model_path
    print(f"Сохранение лучшей модели в: {best_model_path}")

    # Уничтожить группу процессов, если распределённое обучение было инициализировано
    if torch.distributed.is_initialized():
        torch.distributed.destroy_process_group()

if __name__ == "__main__":

    # Настройка парсера аргументов для аргументов командной строки
    parser = argparse.ArgumentParser()

    # Добавление аргументов
    parser.add_argument('--epochs', type=int, default=10)
    parser.add_argument('--batch_size', type=int, default=256)
    parser.add_argument('--patience', type=int, default=10)
    parser.add_argument('--modelname', type=str, default="testing_model")

    # Контейнерная среда
    parser.add_argument("--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"]))
    parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"])
    parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
    parser.add_argument("--data-dir", type=str, default=os.environ["SM_CHANNEL_TRAINING"])
    parser.add_argument("--num-gpus", type=int, default=os.environ["SM_NUM_GPUS"])

    # Парсинг аргументов
    args = parser.parse_args()

    # Убедитесь, что директория модели существует
    os.makedirs(args.model_dir, exist_ok=True)

    # Запуск основной функции
    main(args)

Код задания на обучение:

hyperparameters_set = {
    'epochs': 100,                  # Общее количество эпох
    'batch_size': 200,              # Размер входной партии на каждом устройстве
    'patience': 10,                 # Устойчивость к ранней остановке
    'modelname': model_task_name,   # Имя для модели
}

estimator = PyTorch(
    entry_point = "model_01.py",
    source_dir = "./sage_code_300",
    output_path = jobs_folder + "/",
    code_location = jobs_folder,
    role = role,
    input_mode="FastFile",
    py_version="py310",
    framework_version="2.2.0",
    instance_count=1,
    instance_type="ml.g5.12xlarge",
    hyperparameters=hyperparameters_set,
    volume_size=800,  
    distribution={'pytorchddp': {'enabled': True}},
    dependencies=["./sage_code_300/requirements.txt"],
)

estimator.fit({"training": inputs},
               job_name = job_name,
               wait = False,  
               logs=True) 

Сообщения об ошибках и логи

2024-11-09 00:25:02 Загрузка - Загружается сгенерированная модель обучения
2024-11-09 00:25:02 Не удалось - Задание на обучение завершилось неудачей
---------------------------------------------------------------------------
UnexpectedStatusException                 Traceback (most recent call last)
Cell In[26], line 34
     14 estimator = PyTorch(
     15     entry_point = "model_01.py",
     16     source_dir = "./sage_code_300",
   (...)
     29     dependencies=["./sage_code_300/requirements.txt"],
     30 )
     32 ######### Запуск модели #############
     33 # Отправка модели на задания обучения Sage
---> 34 estimator.fit({"training": inputs},
     35                job_name = job_name,
     36                wait = True,  # True 
     37                logs=True) 
     40 model_will_save_path = os.path.join(jobs_folder, job_name, "output", "model.tar.gz")
     41 print(f"\nМодель сохранена в:\n\n{model_will_save_path}")

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/workflow/pipeline_context.py:346, in runnable_by_pipeline.<locals>.wrapper(*args, **kwargs)
    342         return context
    344     return _StepArguments(retrieve_caller_name(self_instance), run_func, *args, **kwargs)
--> 346 return run_func(*args, **kwargs)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/estimator.py:1376, in EstimatorBase.fit(self, inputs, wait, logs, job_name, experiment_config)
   1374     forward_to_mlflow_tracking_server = True
   1375 if wait:
-> 1376     self.latest_training_job.wait(logs=logs)
   1377 if forward_to_mlflow_tracking_server:
   1378     log_sagemaker_job_to_mlflow(self.latest_training_job.name)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/estimator.py:2750, in _TrainingJob.wait(self, logs)
   2748 # Если запрашиваются логи, вызываем logs_for_jobs.
   2749 if logs != "None":
-> 2750     self.sagemaker_session.logs_for_job(self.job_name, wait=True, log_type=logs)
   2751 else:
   2752     self.sagemaker_session.wait_for_job(self.job_name)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:5945, in Session.logs_for_job(self, job_name, wait, poll, log_type, timeout)
   5924 def logs_for_job(self, job_name, wait=False, poll=10, log_type="All", timeout=None):
   5925     """Отображение логов для данного задания обучения, при необходимости поддерживая их до завершения задания.
   5926 
   5927     Если вывод является tty или ячейкой Jupyter, он будет окрашен
   (...)
   5943         exceptions.UnexpectedStatusException: Если ожидание и задание на обучение завершилось неудачно.
   5944     """
-> 5945     _logs_for_job(self, job_name, wait, poll, log_type, timeout)

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:8547, in _logs_for_job(sagemaker_session, job_name, wait, poll, log_type, timeout)
   8544             last_profiler_rule_statuses = profiler_rule_statuses
   8546 if wait:
-> 8547     _check_job_status(job_name, description, "TrainingJobStatus")
   8548     if dot:
   8549         print()

File ~/anaconda3/envs/python3/lib/python3.10/site-packages/sagemaker/session.py:8611, in _check_job_status(job, desc, status_key_name)
   8605 if "CapacityError" in str(reason):
   8606     raise exceptions.CapacityError(
   8607         message=message,
   8608         allowed_statuses=["Completed", "Stopped"],
   8609         actual_status=status,
   8610     )
-> 8611 raise exceptions.UnexpectedStatusException(
   8612     message=message,
   8613     allowed_statuses=["Completed", "Stopped"],
   8614     actual_status=status,
   8615 )

UnexpectedStatusException: Ошибка задания на обучение model-res50-300k-noft-contra-aug-2024-11-09-00-18-33: Не удалось. Причина: AlgorithmError: ExecuteUserScriptError:
ExitCode 1
ErrorMessage "RuntimeError: SMDDP не поддерживает: ReduceOp
 Traceback (most recent call last)
 File "/opt/conda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
 return _run_code(code, main_globals, None,
 File "/opt/conda/lib/python3.10/runpy.py, line 86, in _run_code
 exec(code, run_globals)
 File "/opt/conda/lib/python3.10/site-packages/mpi4py/__main__.py", line 7, in <module>
 main()
 File "/opt/conda/lib/python3.10/site-packages/mpi4py/run.py", line 230, in main
 run_command_line(args)
 File "/opt/conda/lib/python3.10/site-packages/mpi4py/run.py", line 47, in run_command_line
 run_path(sys.argv[0], run_name="__main__")
 File "/opt/conda/lib/python3.10/runpy.py", line 289, in run_path
 return _run_module_code(code, init_globals, run_name,
 File "s4_mod_pl_cloud_01.py", line 369, in <module>
 main(args)
 File "s4_mod_pl_cloud_01. Проверьте руководство по устранению неполадок для общих ошибок: https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-python-sdk-troubleshooting.html

Среда

#- Версия PyTorch Lightning (например, 2.4.0): 2.4.0
#- Версия PyTorch (например, 2.4): 2.2
#- Версия Python (например, 3.12):  3.10
#- Операционная система (например, Linux): Linux2
#- Версия CUDA/cuDNN: 12.4
#- Модели и конфигурация GPU: A10G
#- Как вы устанавливали Lightning(`conda`, `pip`, source): pip

Ответ или решение

Когда вы пытаетесь развернуть модель PyTorch Lightning на AWS SageMaker с использованием DDP (Distributed Data Parallel), и сталкиваетесь с ошибкой RuntimeError: SMDDP does not support: ReduceOp, это может быть связано с несколькими аспектами вашего кода и конфигурации. Давайте разберёмся в проблеме более детально и предложим возможные решения.

Анализ проблемы

1. Ошибка SMDDP does not support: ReduceOp:

  • Данная ошибка указывает на то, что используемая вами реализация SMDDP (SageMaker Distributed Data Parallel) не поддерживает указанную операцию редукции. Это может произойти, если ваш код или какие-либо используемые вами библиотеки требуют специфических операций, которые не поддерживаются в SMDDP.

2. Использование PyTorch Lightning:

  • PyTorch Lightning — это мощный инструмент для упрощения кода обучения моделей. Однако при работе с распределенным обучением важно убедиться, что вы используете правильные настройки стратегии распределения.

Рекомендации по устранению ошибки

1. Проверьте настройки стратегии распределения:

  • Убедитесь, что вы правильно настроили DDPStrategy в вашем коде. Похоже, что вы используете окружение, определённое в LightningEnvironment, что может вызвать проблемы. Обратите внимание на корректность значений переменных окружения WORLD_SIZE и RANK. Попробуйте также использовать встроенные методы PyTorch Lightning для работы в распределенном режиме.

2. Настройка операционных систем:

  • Проверьте, совместимы ли версии CUDA и PyTorch с используемыми вами экземплярами AWS. Убедитесь, что все зависимости, такие как torchmetrics и сами PyTorch Lightning, полностью совместимы между собой.

3. Измените операции редукции:

  • Попробуйте использовать другие операции редукции, поддерживаемые в SMDDP, такие как all_reduce() вместо тех, которые вызывают проблемы. Возможно, вам потребуется изменить подход к агрегации метрик в вашей модели.

4. Проверка документации:

  • Обязательно проверьте документацию AWS SageMaker на предмет изменений или обновлений, касающихся SMDDP и поддерживаемых операций.

5. Применение альтернативных библиотек:

  • Если проблема не устраняется, рассмотрите возможность использования других стратегий распределенного обучения, таких как DDP в PyTorch без SageMaker. Это может предоставить больше возможностей для гибкой настройки и устранения специфических проблем с совместимостью.

Заключение

Ошибка, с которой вы столкнулись, может вызывать много вопросов, связанных с конфигурацией вашего кода и окружения. Следуя вышеуказанным рекомендациям, вы можете диагностировать и устранить основные проблемы, связанные с развертыванием модели на AWS SageMaker. Помните, что успешная капитуляция к распределенному обучению требует не только понимания используемых инструментов, но и корректного конфигурирования программного обеспечения и окружения.

Оцените материал
Добавить комментарий

Капча загружается...