Я только что начал использовать ‘dask’ и сталкиваюсь с проблемами с памятью. Я хотел бы поделиться своей проблемой вместе с простым фрагментом кода, который иллюстрирует то, что я сделал.
Моя цель – прочитать большой файл, содержащий данные о глобальном сбросе с высоким разрешением, и вычислить перцентили для каждой ячейки широты-долготы.
Я использую ‘dask’ для разбиения данных на части. Однако я не могу разбить их по времени, так как мне нужно вычислить перцентили для всего временного периода, содержащегося в ‘DataArray’.
После создания локального клиента Dask я разбиваю свой ‘DataArray’ по пространству и продолжаю вычислять процентиль для каждой части. Наконец, я пытаюсь сохранить вычисленные перцентили в виде файла ‘NetCDF’. Я выбрал использовать ‘NetCDF4’ для этой задачи вместо ‘xarray’, так как помню, что ‘xarray’ хранит данные в памяти, в то время как ‘NetCDF4’ позволяет записывать на диск.
Проблема, с которой я сталкиваюсь, заключается в том, что при выполнении кода я замечаю, что все мои рабочие процессы вместе потребляют более ‘2GB’ памяти, а сам входной DataArray уже составляет ‘2GB’.
Я ожидал, что код вычислит перцентили для данной части, сохранит эту часть в файл ‘NetCDF’ с использованием написанной мной функции записи, а затем освободит эту информацию из памяти, чтобы освободить место для следующей части. Однако, похоже, что код вычисляет перцентили для всех частей перед их сохранением в NetCDF, что может быть причиной высокого использования памяти.
Я был бы очень признателен за любую помощь в понимании того, что может идти не так. Спасибо!
import numpy as np
import netCDF4 as nc
import xarray as xr
import pandas as pd
from dask.distributed import Client, LocalCluster
def calculate_percentiles(da, percentile):
return da.quantile(percentile / 100, dim="time", skipna=True)
def write_percentiles_data(
percentile_da: xr.DataArray,
save_as: str,
) -> None:
"""
Сохранить вычисленный процентиль в одном файле NetCDF с использованием NetCDF4.
Параметры:
- percentile_da: DataArray вычисленного перцентиля.
- save_as: строка, имя файла NetCDF, который будет сохранен.
"""
# Открыть новый файл NetCDF для записи
with nc.Dataset(save_as, "w", format="NETCDF4") as dst:
# Определить размеры
dst.createDimension("lon", percentile_da.sizes["lon"])
dst.createDimension("lat", percentile_da.sizes["lat"])
# Создать переменные координат для долгот и широт
longitudes = dst.createVariable("lon", np.float64, "lon")
latitudes = dst.createVariable("lat", np.float64, "lat")
# Назначить значения координат
longitudes[:] = percentile_da.coords["lon"].values
latitudes[:] = percentile_da.coords["lat"].values
# Определить переменную с размерами (широта и долгота)
percentiles = dst.createVariable("percentile", np.float64, ("lat", "lon"))
# Назначить данные напрямую без транспонирования, предполагая, что это правильно
percentiles[:, :] = percentile_da.values # Используйте это напрямую
# Назначить атрибуты
percentiles.units = "percentile"
if __name__ == '__main__':
#-------создать фиктивный файл NetCDF
time_dim = 50
lat_dim = 1400
lon_dim = 3600
# Генерировать временные координаты: диапазон дат на 120 часов с '1991-02-01'
time_coords = pd.date_range("1991-02-01", periods=time_dim, freq="h")
# Генерировать координаты широты и долготы
lat_coords = np.linspace(83.95, -55.95, lat_dim)
lon_coords = np.linspace(-179.9, 179.9, lon_dim)
# Создать случайные данные
data = np.random.rand(time_dim, lat_dim, lon_dim) * 100
# Создать DataArray
da = xr.DataArray(
data,
coords=[time_coords, lat_coords, lon_coords],
dims=["time", "lat", "lon"],
attrs={"long_name": "маршрутизируемый поток", "unit": "м3 с-1"},
)
#-------Настроить клиента
cluster = LocalCluster(
n_workers=6,
threads_per_worker=4,
memory_limit=3 * 1e9, # в байтах
)
client = Client(cluster)
#------- разбиение на части
chunks = {"time": -1, "lat": 200, "lon": 200}
chunked_da = da.chunk(chunks)
#-------- Вычислить перцентили с использованием dask
percentile = 95 # получение 95-го процентиля
percentile_da = calculate_percentiles(chunked_da, percentile)
#-------- Сохранить перцентили
event_file_name = f"{percentile}th_percentile.nc"
write_percentiles_data(
percentile_da,
event_file_name,
)
Ответ
Проблемы с памятью при использовании Dask могут проявляться в различных аспектах, особенно когда вы работаете с большими массивами данных. Давайте разберем вашу ситуацию более подробно и предложим возможные решения.
Проблема
Вы столкнулись с высокой потребностью в памяти, когда запускаете код, который должен рассчитать перцентили для больших объемов данных. Основная проблема, с которой вы имеете дело, заключается в том, что все операции Dask выполняются лениво, и, похоже, что все чанки обрабатываются одновременно перед сохранением результатов. Это может приводить к тому, что память заполняется, так как существуют промежуточные результаты для всех чанков.
Решение
Для оптимизации использования памяти и уменьшения общего объема данных, находящихся в памяти одновременно, я предлагаю следующие подходы:
-
Использование
.persist()
:
Вы можете использовать.persist()
на каждом чанке, прежде чем его обрабатывать, чтобы гарантировать, что промежуточные результаты не занимают много памяти. Это будет требовать большего объема памяти, но поможет избежать проблем с обработкой всех данных одновременно. -
Цикличная обработка чанков и сохранение:
Вместо того чтобы вычислять все перцентили одновременно, можете обрабатывать их по очереди и сразу же сохранять каждый результат в файл NetCDF. Это позволит избежать накопления промежуточных результатов в памяти. - Использование
dask.compute
:
Вместо computation сразу над всем массивом, вы можете использоватьdask.compute
и обрабатывать чанки по отдельности.
Ниже приведен адаптированный код с учётом этих рекомендаций:
import numpy as np
import netCDF4 as nc
import xarray as xr
import pandas as pd
from dask.distributed import Client, LocalCluster
def calculate_percentiles(da, percentile):
return da.quantile(percentile / 100, dim="time", skipna=True)
def write_percentiles_data(
percentile_da: xr.DataArray,
save_as: str,
) -> None:
with nc.Dataset(save_as, "w", format="NETCDF4") as dst:
dst.createDimension("lon", percentile_da.sizes["lon"])
dst.createDimension("lat", percentile_da.sizes["lat"])
longitudes = dst.createVariable("lon", np.float64, "lon")
latitudes = dst.createVariable("lat", np.float64, "lat")
longitudes[:] = percentile_da.coords["lon"].values
latitudes[:] = percentile_da.coords["lat"].values
percentiles = dst.createVariable("percentile", np.float64, ("lat", "lon"))
percentiles[:, :] = percentile_da.values
percentiles.units = "percentile"
if __name__ == '__main__':
# Создание демонстрационного NetCDF файла
time_dim = 50
lat_dim = 1400
lon_dim = 3600
time_coords = pd.date_range("1991-02-01", periods=time_dim, freq="h")
lat_coords = np.linspace(83.95, -55.95, lat_dim)
lon_coords = np.linspace(-179.9, 179.9, lon_dim)
data = np.random.rand(time_dim, lat_dim, lon_dim) * 100
da = xr.DataArray(
data,
coords=[time_coords, lat_coords, lon_coords],
dims=["time", "lat", "lon"],
attrs={"long_name": "routed streamflow", "unit": "m3 s-1"},
)
# Установка клиента Dask
cluster = LocalCluster(
n_workers=6,
threads_per_worker=4,
memory_limit=3 * 1e9, # в байтах
)
client = Client(cluster)
# Чанковка
chunks = {"time": -1, "lat": 200, "lon": 200}
chunked_da = da.chunk(chunks)
# Рассчитываем перцентили
percentile = 95 # получение 95-го перцентиля
percentile_da = calculate_percentiles(chunked_da, percentile)
# Сохраняем перцентили
event_file_name = f"{percentile}th_percentile.nc"
# Использование Dask для сохранения чанков
percentile_da = percentile_da.compute() # Замените это на более сложную логику обработки для уровня чанков при необходимости
write_percentiles_data(percentile_da, event_file_name)
Заключение
Использование Dask может значительно улучшить производительность при обработке больших данных, но необходимо управлять памятью и промежуточными результатами. Циклическая обработка чанков и их немедленное сохранение приведет к более глубокому контролю за использованием памяти. Надеюсь, эти советы помогут вам устранить проблему с памятью.