如何最好地将 NetCDF 文件集合重新分块到 Zarr 数据集
Posted
技术标签:
【中文标题】如何最好地将 NetCDF 文件集合重新分块到 Zarr 数据集【英文标题】:How best to rechunk a NetCDF file collection to Zarr dataset 【发布时间】:2018-09-30 16:19:36 【问题描述】:我正在尝试重新分块 NetCDF 文件集合并在 AWS S3 上创建一个 Zarr 数据集。我有 168 个原始 NetCDF4 经典文件,其数组维度为 time: 1, y: 3840, x: 4608
,分块为 chunks='time':1, 'y':768, 'x':922
。
我想将此输出写入 Zarr,并且我想优化时间序列提取,因此在我的块中包含更多时间记录。我想我会使用 xarray 来帮助完成工作,因为我有很多处理器可以利用 Dask,而且 xarray 有 xr.open_mfdataset
和 ds.to_zarr
。
我首先尝试重新分块到chunks='time':24, 'y':768, 'x':922
以匹配x
和y
中的输入NetCDF4 分块,但是当我尝试写信给Zarr 时它抱怨说,因为它需要x
和@987654335 中的统一块大小@,只允许沿time
维度的最后一个块中的非均匀大小(不幸的是,在x
维度中,总大小4608 不是块大小922 的倍数。
然后我尝试了chunks='time':168, 'y':384, 'x':288
并开始工作,并且非常快速地进行了几分钟,然后变得越来越慢。最终在 50 分钟后,集群死亡:
4072 distributed.core - INFO - Event loop was unresponsive in Worker for 1.41s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
4073 slurmstepd: error: Step 3294889.0 exceeded memory limit (25346188 > 25165824), being killed
这是我正在使用的代码:
from dask.distributed import Client
import pandas as pd
import xarray as xr
import s3fs
import zarr
client = Client(scheduler_file='/home/rsignell/scheduler.json')
client
root = '/lustre/projects/hazards/cmgp/woodshole/rsignell/nwm/forcing_short_range/'
bucket_endpoint='https://s3.us-west-1.amazonaws.com/'
f_zarr = 'rsignell/nwm/test_week4'
dates = pd.date_range(start='2018-04-01T00:00', end='2018-04-07T23:00', freq='H')
urls = ['/nwm.tz.short_range.forcing.f001.conus.nc'.format(root,a.strftime('%Y%m%d'),a.strftime('%H')) for a in dates]
ds = xr.open_mfdataset(urls, concat_dim='time', chunks='time':1, 'y':768, 'x':922)
ds = ds.drop(['ProjectionCoordinateSystem','time_bounds'])
ds = ds.chunk(chunks='time':168, 'y':384, 'x':288).persist()
ds
生产
<xarray.Dataset>
Dimensions: (reference_time: 168, time: 168, x: 4608, y: 3840)
Coordinates:
* reference_time (reference_time) datetime64[ns] 2018-04-01 ...
* x (x) float64 -2.304e+06 -2.303e+06 -2.302e+06 -2.301e+06 ...
* y (y) float64 -1.92e+06 -1.919e+06 -1.918e+06 -1.917e+06 ...
* time (time) datetime64[ns] 2018-04-01T01:00:00 ...
Data variables:
T2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
LWDOWN (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
Q2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
U2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
V2D (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
PSFC (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
RAINRATE (time, y, x) float32 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
SWDOWN (time, y, x) float64 dask.array<shape=(168, 3840, 4608), chunksize=(168, 384, 288)>
然后我打电话
fs = s3fs.S3FileSystem(anon=False, client_kwargs=dict(endpoint_url=bucket_endpoint))
d = s3fs.S3Map(f_zarr, s3=fs)
compressor = zarr.Blosc(cname='zstd', clevel=3, shuffle=2)
encoding = vname: 'compressor': compressor for vname in ds.data_vars
delayed_store = ds.to_zarr(store=d, mode='w', encoding=encoding, compute=False)
persist_store = delayed_store.persist(retries=100)
在它死之前,Dask 仪表板看起来像这样:
NetCDF4 文件的总大小为 20GB,因此我在 Dask Dashboard 中显示超过 500GB 似乎有点疯狂,而且每个具有 60GB RAM 的 30 个处理器不足以完成这项工作。
我做错了什么,或者什么是更好的解决方案?
【问题讨论】:
第一个数据集.persist()
完成后你的内存使用量是多少?
您可以尝试在to_zarr
步骤之前保存完整的数据集吗?
我确实在上面的数据集加载上使用了.persist()
,实际上,这是它与slurmstepd: error: Step 3295169.0 exceeded memory limit (126726200 > 125829120), being killed slurmstepd: error: *** STEP 3295169.0 ON n3-86 CANCELLED AT 2018-04-23T10:33:50 *** slurmstepd: error: Exceeded job memory limit
一起崩溃的地方
【参考方案1】:
我注意到您说要增加时间维度中的块数。或者我理解错了。
您从指定为chunks='time':1, 'y':768, 'x':922
的块开始,然后尝试chunks='time':168, 'y':384, 'x':288
并发现第二个使用大量内存。
问题是chunks
关键字指定了块的大小,而不是块的个数!
在第一种情况下,每个块的大小是1*768*922 ~ 7e5
,而在第二种情况下,每个块的大小是168*384*288 ~ 2e7
。
时间块的最大数量由chunks='time': 1
实现。
【讨论】:
以上是关于如何最好地将 NetCDF 文件集合重新分块到 Zarr 数据集的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python 连接来自多个 netCDF 文件的数据
如何最好地将多个文本文件导入 SQLite FTS 虚拟表?