Вопрос или проблема
Я пытаюсь реализовать dask. В данный момент я использую joblib, и он работает отлично, использует весь ЦП, что, на мой взгляд, идеально, но я хочу добавить больше ресурсов.
Теперь, пытаясь реализовать dask, это идет гораздо медленнее, я не знаю, что я делаю не так.
def evaluate_params(params, train, holidays, initial, periode, horizon,parallel:str="processes"):
mae_ = 999999
rmse_ = 999999
df_p = pd.DataFrame()
try:
model = Prophet(**params,
holidays=holidays)
model.add_country_holidays(country_name="PA")
model.add_regressor('weekendOrPayday')
model.fit(train)
# Используя родное параллельное выполнение Dask в Prophet
df_cv = cross_validation(
model,
initial=f"{initial} days",
period=f"{periode} days",
horizon=f"{horizon} days",
parallel=parallel # поменять "processes" на "dask"
)
df_p = performance_metrics(df_cv, rolling_window=1)
except Exception as e:
error = f"Параметры {params}: {repr(e)}"
print(error, flush=True)
if not df_p.empty:
mae_ = df_p["mae"].values[0]
rmse_ = df_p["rmse"].values[0]
return {
'params': params,
'mae': mae_,
'rmse': rmse_
}
def optimize_prophet(train, holidays, initial_days, periode_days, horizon_days):
param_grid = {
'changepoint_range': [0.8, 0.90, 0.95],
"changepoint_prior_scale": [
x for x in np.arange(0.01, 0.06, 0.01, dtype=float)
],
"seasonality_prior_scale": [
x for x in np.arange(1, 10.1, 1, dtype=float)
],
"seasonality_mode": ["аддитивный", "умножительный"],
'holidays_prior_scale': [x for x in np.arange(1, 10.1, 1, dtype=float)],
}
all_params = [
dict(zip(param_grid.keys(), v))
for v in itertools.product(*param_grid.values())
]
if daskcluster == True:
results = []
## с этим кодом не работает
"""future = delayed(evaluate_params)(
params,
train,
holidays,
initial_days,
periode_days,
horizon_days,
"dask"
)
results = self.client.compute(results)
results = self.client.gather(results)
"""
for params in all_params:
future = evaluate_params(
params,
train,
holidays,
initial_days,
periode_days,
horizon_days,
"dask"
)
results.append(future)
client.close()
cluster.close()
else:
results = Parallel(n_jobs=self.n_jobs)(
delayed(evaluate_params)(params, train, holidays,
initial_days, periode_days, horizon_days)
for params in all_params
)
best_result = min(results, key=lambda x: x['rmse'])
best_params = best_result['params']
минимальный_mae = best_result['mae']
return best_params, минимальный_mae
Ответ или решение
Внедрение кластера Dask с использованием Prophet
Введение
Существует потребность в оптимизации процесса моделирования временных рядов с использованием библиотеки Prophet, особенно когда необходимо задействовать больше вычислительных ресурсов для улучшения производительности. Ваша текущая реализация с использованием joblib
работает эффективно, однако переход на Dask может вызвать замедления, если он выполнен неправильно. В данной статье мы предложим оптимизированный подход для внедрения Dask-кластера в вашу функцию optimize_prophet
.
Настройка Dask-кластера
Прежде всего, важно правильно настроить Dask-кластер. Следующий код поможет вам создать Dask-кластер, который может быть использован для параллельного выполнения вычислений:
from dask.distributed import Client, LocalCluster
# Запускаем локальный кластер Dask
cluster = LocalCluster() # Использует всю доступную мощность вашего компьютера
client = Client(cluster) # Подключаемся к кластеру
Убедитесь, что у вас установлены необходимые библиотеки:
pip install dask[distributed] fbprophet
Модификация функции optimize_prophet
Теперь мы адаптируем вашу функцию optimize_prophet
, чтобы корректно использовать Dask для параллельных вычислений. В этом примере мы создадим отложенные вычисления для функции evaluate_params
с использованием Dask:
from dask import delayed
def optimize_prophet(train, holidays, initial_days, periode_days, horizon_days, use_dask=True):
param_grid = {
'changepoint_range': [0.8, 0.90, 0.95],
"changepoint_prior_scale": list(np.arange(0.01, 0.06, 0.01)),
"seasonality_prior_scale": list(np.arange(1, 10.1, 1)),
"seasonality_mode": ["additive", "multiplicative"],
'holidays_prior_scale': list(np.arange(1, 10.1, 1)),
}
all_params = [
dict(zip(param_grid.keys(), v))
for v in itertools.product(*param_grid.values())
]
if use_dask:
results = []
for params in all_params:
future = delayed(evaluate_params)(
params,
train,
holidays,
initial_days,
periode_days,
horizon_days,
"dask"
)
results.append(future) # собираем отложенные задачи
# Вычисляем результаты
results = client.compute(results, scheduler='threaded')
results = client.gather(results) # собираем вычисленные результаты
client.close()
cluster.close()
else:
results = Parallel(n_jobs=-1)(
delayed(evaluate_params)(params, train, holidays,
initial_days, periode_days, horizon_days)
for params in all_params
)
best_result = min(results, key=lambda x: x['rmse'])
best_params = best_result['params']
minimal_mae = best_result['mae']
return best_params, minimal_mae
Важные моменты для оптимизации
-
Отложенные вычисления: Используйте
delayed
из Dask для создания отложенных задач. Это позволяет Dask оптимизировать распределение задач и использовать ресурсы более эффективно. -
Сбор результатов: Убедитесь, что вы собираете результаты с использованием
client.gather()
, чтобы получить окончательные результаты после завершения всех вычислений. -
Закрытие клиента и кластера: Не забудьте правильно закрывать клиент и кластер после завершения работы, чтобы освободить ресурсы.
-
Параллельные задачи: Обратите внимание на механизм планирования. Убедитесь, что у вас есть достаточное количество ресурсов, чтобы запустить необходимые задачи параллельно, иначе вы можете столкнуться с уменьшенной производительностью.
Заключение
Внедрение Dask-кластера для модели Prophet может значительно повысить производительность ваших расчетов, особенно в условиях больших объемов данных или сложных параметрических оптимизаций. Следуя предложенному пути, вы сможете успешно интегрировать Dask в вашу архитектуру, что приведет к более эффективному использованию ресурсов и сокращению времени обработки.