xarray:大于使用 map_blocks 将结果转储到 .zarr 存储的内存数组

Posted

技术标签:

【中文标题】xarray:大于使用 map_blocks 将结果转储到 .zarr 存储的内存数组【英文标题】:xarray: Larger than memory array using map_blocks dumping results into .zarr store 【发布时间】:2022-01-23 20:52:43 【问题描述】:

我正在尝试并行化一个生成非常大的 numpy 数组的操作,并且通常会炸毁正在运行它的机器的内存。

我想出的是以下工作流程:

    使用 Dask 生成惰性零填充数组 使用 X-Array 生成 DataArray,使用之前的惰性零数组及其适当的坐标等... 使用DataArray.map_blocks,我调用了一个函数write_values,它从一个单独的文件中获取一个Numpy数组的子集,然后将它们插入到xarray.DataArray中的适当位置。李> 懒惰地转换为xarray.Dataset,名称为DataArray 然后我尝试通过to_zarr 存储到磁盘中

第一:这是否适合处理循环遍历分块数组中的块的操作?

第二:当我运行这个程序时,它在执行时会耗尽我的内存,这可能是由于通过 Dask 创建的任务量?我怎样才能优化到永远不会达到我机器的内存限制。

第三:在这段代码运行后,我得到了一个存储到磁盘中的 zarr,但它似乎并没有真正存储我从外部函数获得的值。这是更改磁盘存储数组中值的正确方法吗?

问题:我将.zarr 写入磁盘的函数不会写入numpy_returning_volume 中的值。我在想可能是我需要在 map_blocks 函数中写入值?

谢谢!

完整工作示例:

import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar

class NumpyReturningVolume():
    def __init__(self):
        # self.data = da.random.random_sample([50000, 50000, 50000])
        self.data = np.random.random_sample([500, 1000, 100])

    def num_i(self):
        return self.data.shape[0]

    def num_j(self):
        return self.data.shape[1]

    def num_k(self):
        return self.data.shape[2]
    
    def get_float(self, start_coordinate, end_coordinate):
        return self.data[
            start_coordinate[0]:end_coordinate[0],
            start_coordinate[1]:end_coordinate[1],
            start_coordinate[2]:end_coordinate[2]
            ]


def write_values(chunk, **kwargs):

    start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
    end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)

    volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
    chunk.data = volume_data

    return(chunk)


seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
    seismic_file_name + "_test.zarr")

numpy_returning_volume = NumpyReturningVolume()

dimensions = ('track', 'bin', 'time')

track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)

empty_arr = da.empty(shape=(
    numpy_returning_volume.num_i(),
    numpy_returning_volume.num_j(),
    numpy_returning_volume.num_k()),
    dtype=np.float32)

xarray_data = xr.DataArray(empty_arr, name="seis", coords=
    'track': track_coords,
    'bin': bin_coords, 'time': time_coords,
    dims=dimensions)

xarray_data.map_blocks(write_values, kwargs=
                       "volume": numpy_returning_volume, template=xarray_data).compute()
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)

with ProgressBar():
    delayed_results.compute()

【问题讨论】:

【参考方案1】:

天哪!我才意识到我的问题是世界上最简单的事情!我只需要设置一个等于地图块结果的变量,一切正常。如果有人感兴趣,这是完整的工作脚本。它会生成一个 6GB 的数据集

import dask.array as da
import xarray as xr
import numpy as np
import pathlib
from dask.diagnostics import ProgressBar

class NumpyReturningVolume():
    def __init__(self):
        self.data = da.random.random_sample([1000, 2000, 1000])
        # self.data = np.random.random_sample([500, 1000, 100])

    def num_i(self):
        return self.data.shape[0]

    def num_j(self):
        return self.data.shape[1]

    def num_k(self):
        return self.data.shape[2]
    
    def get_float(self, start_coordinate, end_coordinate):
        return self.data[
            start_coordinate[0]:end_coordinate[0],
            start_coordinate[1]:end_coordinate[1],
            start_coordinate[2]:end_coordinate[2]
            ].compute()


def write_values(chunk, **kwargs):

    start_coordinate = (chunk.coords["track"].values[0], chunk.coords["bin"].values[0], chunk.coords["time"].values[0])
    end_coordinate = (chunk.coords["track"].values[-1]+1, chunk.coords["bin"].values[-1]+1, chunk.coords["time"].values[-1]+1)

    volume_data = kwargs["volume"].get_float(start_coordinate, end_coordinate)
    chunk.data = volume_data

    return(chunk)


seismic_file_path = pathlib.Path("./")
seismic_file_name = "TEST_FILE.ds"
store_path = seismic_file_path.parent.joinpath(
    seismic_file_name + "_test.zarr")

numpy_returning_volume = NumpyReturningVolume()

dimensions = ('track', 'bin', 'time')

track_coords = np.arange(0, numpy_returning_volume.num_i(), 1, dtype=np.uint32)
bin_coords = np.arange(0, numpy_returning_volume.num_j(), 1, dtype=np.uint32)
time_coords = np.arange(0, numpy_returning_volume.num_k(), 1, dtype=np.uint32)

empty_arr = da.empty(shape=(
    numpy_returning_volume.num_i(),
    numpy_returning_volume.num_j(),
    numpy_returning_volume.num_k()),
    dtype=np.float32)

xarray_data = xr.DataArray(empty_arr, name="seis", coords=
    'track': track_coords,
    'bin': bin_coords, 'time': time_coords,
    dims=dimensions)

# This xarray_data = is what I was missing!!
xarray_data = xarray_data.map_blocks(write_values, kwargs=
                       "volume": numpy_returning_volume, template=xarray_data)
xarray_data = xarray_data.to_dataset(name="seis")
delayed_results = xarray_data.to_zarr(store_path.__str__(), compute=False)

with ProgressBar():
    delayed_results.compute()

【讨论】:

以上是关于xarray:大于使用 map_blocks 将结果转储到 .zarr 存储的内存数组的主要内容,如果未能解决你的问题,请参考以下文章

使用没有日期的时间作为 xarray 中的一维

使用 xarray 选择所有坐标值

为啥我们使用基数树(或 xarray)来存储页面缓存?

滚动均值持续到明年 (xarray)

如何使用 xarray 沿时间维度扩展数据变量?

使用xarray加入/合并多个NetCDF文件