реализовать кластер dask с prophet

Вопрос или проблема

Я пытаюсь реализовать dask. В данный момент я использую joblib, и он работает отлично, использует весь ЦП, что, на мой взгляд, идеально, но я хочу добавить больше ресурсов.

Теперь, пытаясь реализовать dask, это идет гораздо медленнее, я не знаю, что я делаю не так.

def evaluate_params(params, train, holidays, initial, periode, horizon,parallel:str="processes"):
    mae_ = 999999
    rmse_ = 999999
    df_p = pd.DataFrame()

    try:
        model = Prophet(**params,
                       holidays=holidays)

        model.add_country_holidays(country_name="PA")
        model.add_regressor('weekendOrPayday')
        model.fit(train)

        # Используя родное параллельное выполнение Dask в Prophet
        df_cv = cross_validation(
            model,
            initial=f"{initial} days",
            period=f"{periode} days",
            horizon=f"{horizon} days",
            parallel=parallel # поменять "processes" на "dask"
        )

        df_p = performance_metrics(df_cv, rolling_window=1)

    except Exception as e:
        error = f"Параметры {params}: {repr(e)}"
        print(error, flush=True)

    if not df_p.empty:
        mae_ = df_p["mae"].values[0]
        rmse_ = df_p["rmse"].values[0]

    return {
        'params': params,
        'mae': mae_,
        'rmse': rmse_
    }

def optimize_prophet(train, holidays, initial_days, periode_days, horizon_days):

    param_grid = {
        'changepoint_range': [0.8, 0.90, 0.95],
        "changepoint_prior_scale": [
            x for x in np.arange(0.01, 0.06, 0.01, dtype=float)
        ],
        "seasonality_prior_scale": [
            x for x in np.arange(1, 10.1, 1, dtype=float)
        ],
        "seasonality_mode": ["аддитивный", "умножительный"],
        'holidays_prior_scale': [x for x in np.arange(1, 10.1, 1, dtype=float)],
    }

    all_params = [
        dict(zip(param_grid.keys(), v))
        for v in itertools.product(*param_grid.values())
    ]

    if daskcluster == True:
        results = []
        ## с этим кодом не работает
        """future = delayed(evaluate_params)(
                        params, 
                        train, 
                        holidays,
                        initial_days, 
                        periode_days, 
                        horizon_days,
                        "dask"   
                    )

        results = self.client.compute(results)
        results = self.client.gather(results)           
        """

        for params in all_params:
            future = evaluate_params(
                params, 
                train, 
                holidays,
                initial_days, 
                periode_days, 
                horizon_days,
                "dask"
            )
            results.append(future)
        client.close()
        cluster.close()

    else:
        results = Parallel(n_jobs=self.n_jobs)(
                    delayed(evaluate_params)(params, train, holidays,
                                            initial_days, periode_days, horizon_days)
                    for params in all_params
                )

    best_result = min(results, key=lambda x: x['rmse'])
    best_params = best_result['params']
    минимальный_mae = best_result['mae']

    return best_params, минимальный_mae

Ответ или решение

Внедрение кластера Dask с использованием Prophet

Введение

Существует потребность в оптимизации процесса моделирования временных рядов с использованием библиотеки Prophet, особенно когда необходимо задействовать больше вычислительных ресурсов для улучшения производительности. Ваша текущая реализация с использованием joblib работает эффективно, однако переход на Dask может вызвать замедления, если он выполнен неправильно. В данной статье мы предложим оптимизированный подход для внедрения Dask-кластера в вашу функцию optimize_prophet.

Настройка Dask-кластера

Прежде всего, важно правильно настроить Dask-кластер. Следующий код поможет вам создать Dask-кластер, который может быть использован для параллельного выполнения вычислений:

from dask.distributed import Client, LocalCluster

# Запускаем локальный кластер Dask
cluster = LocalCluster()  # Использует всю доступную мощность вашего компьютера
client = Client(cluster)   # Подключаемся к кластеру

Убедитесь, что у вас установлены необходимые библиотеки:

pip install dask[distributed] fbprophet

Модификация функции optimize_prophet

Теперь мы адаптируем вашу функцию optimize_prophet, чтобы корректно использовать Dask для параллельных вычислений. В этом примере мы создадим отложенные вычисления для функции evaluate_params с использованием Dask:

from dask import delayed

def optimize_prophet(train, holidays, initial_days, periode_days, horizon_days, use_dask=True):

    param_grid = {
        'changepoint_range': [0.8, 0.90, 0.95],
        "changepoint_prior_scale": list(np.arange(0.01, 0.06, 0.01)),
        "seasonality_prior_scale": list(np.arange(1, 10.1, 1)),
        "seasonality_mode": ["additive", "multiplicative"],
        'holidays_prior_scale': list(np.arange(1, 10.1, 1)),
    }

    all_params = [
        dict(zip(param_grid.keys(), v))
        for v in itertools.product(*param_grid.values())
    ]

    if use_dask:
        results = []
        for params in all_params:
            future = delayed(evaluate_params)(
                params, 
                train, 
                holidays,
                initial_days, 
                periode_days, 
                horizon_days,
                "dask"
            )
            results.append(future)  # собираем отложенные задачи

        # Вычисляем результаты
        results = client.compute(results, scheduler='threaded')
        results = client.gather(results)  # собираем вычисленные результаты

        client.close()
        cluster.close()
    else:
        results = Parallel(n_jobs=-1)(
            delayed(evaluate_params)(params, train, holidays,
                                      initial_days, periode_days, horizon_days)
            for params in all_params
        )

    best_result = min(results, key=lambda x: x['rmse'])
    best_params = best_result['params']
    minimal_mae = best_result['mae']

    return best_params, minimal_mae

Важные моменты для оптимизации

  1. Отложенные вычисления: Используйте delayed из Dask для создания отложенных задач. Это позволяет Dask оптимизировать распределение задач и использовать ресурсы более эффективно.

  2. Сбор результатов: Убедитесь, что вы собираете результаты с использованием client.gather(), чтобы получить окончательные результаты после завершения всех вычислений.

  3. Закрытие клиента и кластера: Не забудьте правильно закрывать клиент и кластер после завершения работы, чтобы освободить ресурсы.

  4. Параллельные задачи: Обратите внимание на механизм планирования. Убедитесь, что у вас есть достаточное количество ресурсов, чтобы запустить необходимые задачи параллельно, иначе вы можете столкнуться с уменьшенной производительностью.

Заключение

Внедрение Dask-кластера для модели Prophet может значительно повысить производительность ваших расчетов, особенно в условиях больших объемов данных или сложных параметрических оптимизаций. Следуя предложенному пути, вы сможете успешно интегрировать Dask в вашу архитектуру, что приведет к более эффективному использованию ресурсов и сокращению времени обработки.

Оцените материал
Добавить комментарий

Капча загружается...