Проблемы с памятью при использовании Dask

Вопросы и ответы

Я только что начал использовать ‘dask’ и сталкиваюсь с проблемами с памятью. Я хотел бы поделиться своей проблемой вместе с простым фрагментом кода, который иллюстрирует то, что я сделал.

Моя цель – прочитать большой файл, содержащий данные о глобальном сбросе с высоким разрешением, и вычислить перцентили для каждой ячейки широты-долготы.

Я использую ‘dask’ для разбиения данных на части. Однако я не могу разбить их по времени, так как мне нужно вычислить перцентили для всего временного периода, содержащегося в ‘DataArray’.

После создания локального клиента Dask я разбиваю свой ‘DataArray’ по пространству и продолжаю вычислять процентиль для каждой части. Наконец, я пытаюсь сохранить вычисленные перцентили в виде файла ‘NetCDF’. Я выбрал использовать ‘NetCDF4’ для этой задачи вместо ‘xarray’, так как помню, что ‘xarray’ хранит данные в памяти, в то время как ‘NetCDF4’ позволяет записывать на диск.

Проблема, с которой я сталкиваюсь, заключается в том, что при выполнении кода я замечаю, что все мои рабочие процессы вместе потребляют более ‘2GB’ памяти, а сам входной DataArray уже составляет ‘2GB’.

Я ожидал, что код вычислит перцентили для данной части, сохранит эту часть в файл ‘NetCDF’ с использованием написанной мной функции записи, а затем освободит эту информацию из памяти, чтобы освободить место для следующей части. Однако, похоже, что код вычисляет перцентили для всех частей перед их сохранением в NetCDF, что может быть причиной высокого использования памяти.

Я был бы очень признателен за любую помощь в понимании того, что может идти не так. Спасибо!

import numpy as np
import netCDF4 as nc
import xarray as xr
import pandas as pd
from dask.distributed import Client, LocalCluster

def calculate_percentiles(da, percentile):
    return da.quantile(percentile / 100, dim="time", skipna=True)

def write_percentiles_data(
    percentile_da: xr.DataArray,
    save_as: str,
) -> None:
    """
    Сохранить вычисленный процентиль в одном файле NetCDF с использованием NetCDF4.

    Параметры:
    - percentile_da: DataArray вычисленного перцентиля.
    - save_as: строка, имя файла NetCDF, который будет сохранен.
    """
    # Открыть новый файл NetCDF для записи
    with nc.Dataset(save_as, "w", format="NETCDF4") as dst:
        # Определить размеры
        dst.createDimension("lon", percentile_da.sizes["lon"])
        dst.createDimension("lat", percentile_da.sizes["lat"])

        # Создать переменные координат для долгот и широт
        longitudes = dst.createVariable("lon", np.float64, "lon")
        latitudes = dst.createVariable("lat", np.float64, "lat")

        # Назначить значения координат
        longitudes[:] = percentile_da.coords["lon"].values
        latitudes[:] = percentile_da.coords["lat"].values

        # Определить переменную с размерами (широта и долгота)
        percentiles = dst.createVariable("percentile", np.float64, ("lat", "lon"))

        # Назначить данные напрямую без транспонирования, предполагая, что это правильно
        percentiles[:, :] = percentile_da.values  # Используйте это напрямую

        # Назначить атрибуты
        percentiles.units = "percentile"

if __name__ == '__main__':
    #-------создать фиктивный файл NetCDF
    time_dim = 50
    lat_dim = 1400
    lon_dim = 3600
    # Генерировать временные координаты: диапазон дат на 120 часов с '1991-02-01'
    time_coords = pd.date_range("1991-02-01", periods=time_dim, freq="h")
    # Генерировать координаты широты и долготы
    lat_coords = np.linspace(83.95, -55.95, lat_dim)
    lon_coords = np.linspace(-179.9, 179.9, lon_dim)
    # Создать случайные данные
    data = np.random.rand(time_dim, lat_dim, lon_dim) * 100
    # Создать DataArray
    da = xr.DataArray(
        data,
        coords=[time_coords, lat_coords, lon_coords],
        dims=["time", "lat", "lon"],
        attrs={"long_name": "маршрутизируемый поток", "unit": "м3 с-1"},
    )

    #-------Настроить клиента
    cluster = LocalCluster(
        n_workers=6,
        threads_per_worker=4,
        memory_limit=3 * 1e9,  # в байтах
    )
    client = Client(cluster)

    #------- разбиение на части
    chunks = {"time": -1, "lat": 200, "lon": 200}
    chunked_da = da.chunk(chunks)

    #-------- Вычислить перцентили с использованием dask
    percentile = 95  # получение 95-го процентиля
    percentile_da = calculate_percentiles(chunked_da, percentile)

    #-------- Сохранить перцентили
    event_file_name = f"{percentile}th_percentile.nc"
    write_percentiles_data(
        percentile_da,
        event_file_name,
    )

Ответ

Проблемы с памятью при использовании Dask могут проявляться в различных аспектах, особенно когда вы работаете с большими массивами данных. Давайте разберем вашу ситуацию более подробно и предложим возможные решения.

Проблема

Вы столкнулись с высокой потребностью в памяти, когда запускаете код, который должен рассчитать перцентили для больших объемов данных. Основная проблема, с которой вы имеете дело, заключается в том, что все операции Dask выполняются лениво, и, похоже, что все чанки обрабатываются одновременно перед сохранением результатов. Это может приводить к тому, что память заполняется, так как существуют промежуточные результаты для всех чанков.

Решение

Для оптимизации использования памяти и уменьшения общего объема данных, находящихся в памяти одновременно, я предлагаю следующие подходы:

  1. Использование .persist():
    Вы можете использовать .persist() на каждом чанке, прежде чем его обрабатывать, чтобы гарантировать, что промежуточные результаты не занимают много памяти. Это будет требовать большего объема памяти, но поможет избежать проблем с обработкой всех данных одновременно.

  2. Цикличная обработка чанков и сохранение:
    Вместо того чтобы вычислять все перцентили одновременно, можете обрабатывать их по очереди и сразу же сохранять каждый результат в файл NetCDF. Это позволит избежать накопления промежуточных результатов в памяти.

  3. Использование dask.compute:
    Вместо computation сразу над всем массивом, вы можете использовать dask.compute и обрабатывать чанки по отдельности.

Ниже приведен адаптированный код с учётом этих рекомендаций:

import numpy as np
import netCDF4 as nc
import xarray as xr
import pandas as pd
from dask.distributed import Client, LocalCluster

def calculate_percentiles(da, percentile):
    return da.quantile(percentile / 100, dim="time", skipna=True)

def write_percentiles_data(
    percentile_da: xr.DataArray,
    save_as: str,
) -> None:
    with nc.Dataset(save_as, "w", format="NETCDF4") as dst:
        dst.createDimension("lon", percentile_da.sizes["lon"])
        dst.createDimension("lat", percentile_da.sizes["lat"])
        longitudes = dst.createVariable("lon", np.float64, "lon")
        latitudes = dst.createVariable("lat", np.float64, "lat")
        longitudes[:] = percentile_da.coords["lon"].values
        latitudes[:] = percentile_da.coords["lat"].values
        percentiles = dst.createVariable("percentile", np.float64, ("lat", "lon"))
        percentiles[:, :] = percentile_da.values
        percentiles.units = "percentile"

if __name__ == '__main__':
    # Создание демонстрационного NetCDF файла
    time_dim = 50
    lat_dim = 1400
    lon_dim = 3600
    time_coords = pd.date_range("1991-02-01", periods=time_dim, freq="h")
    lat_coords = np.linspace(83.95, -55.95, lat_dim)
    lon_coords = np.linspace(-179.9, 179.9, lon_dim)
    data = np.random.rand(time_dim, lat_dim, lon_dim) * 100
    da = xr.DataArray(
        data,
        coords=[time_coords, lat_coords, lon_coords],
        dims=["time", "lat", "lon"],
        attrs={"long_name": "routed streamflow", "unit": "m3 s-1"},
    )

    # Установка клиента Dask
    cluster = LocalCluster(
        n_workers=6,
        threads_per_worker=4,
        memory_limit=3 * 1e9,  # в байтах
    )
    client = Client(cluster)

    # Чанковка
    chunks = {"time": -1, "lat": 200, "lon": 200}
    chunked_da = da.chunk(chunks)

    # Рассчитываем перцентили
    percentile = 95  # получение 95-го перцентиля
    percentile_da = calculate_percentiles(chunked_da, percentile)

    # Сохраняем перцентили
    event_file_name = f"{percentile}th_percentile.nc"

    # Использование Dask для сохранения чанков
    percentile_da = percentile_da.compute()  # Замените это на более сложную логику обработки для уровня чанков при необходимости
    write_percentiles_data(percentile_da, event_file_name)

Заключение

Использование Dask может значительно улучшить производительность при обработке больших данных, но необходимо управлять памятью и промежуточными результатами. Циклическая обработка чанков и их немедленное сохранение приведет к более глубокому контролю за использованием памяти. Надеюсь, эти советы помогут вам устранить проблему с памятью.

Оцените материал
Добавить комментарий

Капча загружается...