将 Keras ModelCheckpoints 保存在 Google Cloud Bucket 中

Posted

技术标签:

【中文标题】将 Keras ModelCheckpoints 保存在 Google Cloud Bucket 中【英文标题】:Save Keras ModelCheckpoints in Google Cloud Bucket 【发布时间】:2018-01-17 00:20:38 【问题描述】:

我正在使用带有 TensorFlow 后端的 Keras 在 Google Cloud 机器学习引擎上训练 LSTM 网络。在对 gcloud 和我的 python 脚本进行一些调整后,我管理它来部署我的模型并执行成功的训练任务。

然后我尝试使用 Keras modelCheckpoint callback 让我的模型在每个 epoch 后保存检查点。使用 Google Cloud 运行本地培训作业可以按预期完美运行。在每个 epoch 之后,权重都存储在指定的路径中。但是,当我尝试在 Google Cloud Machine Learning Engine 上在线运行相同的作业时,weights.hdf5 不会被写入我的 Google Cloud Bucket。相反,我收到以下错误:

...
File "h5f.pyx", line 71, in h5py.h5f.open (h5py/h5f.c:1797)
IOError: Unable to open file (Unable to open file: name = 
'gs://.../weights.hdf5', errno = 2, error message = 'no such file or
directory', flags = 0, o_flags = 0)

我调查了这个问题,结果证明 Bucket 本身没有问题,因为 Keras Tensorboard callback 确实可以正常工作并将预期的输出写入同一个 bucket。我还确保 h5py 被包含在 setup.py 中,位于:

├── setup.py
    └── trainer
    ├── __init__.py
    ├── ...

setup.py 中的实际包含如下所示:

# setup.py
from setuptools import setup, find_packages

setup(name='kerasLSTM',
      version='0.1',
      packages=find_packages(),
      author='Kevin Katzke',
      install_requires=['keras','h5py','simplejson'],
      zip_safe=False)

我想问题归结为这样一个事实,即 GCS 无法使用 Python open 进行 I/O 访问,因为它提供了自定义实现:

import tensorflow as tf
from tensorflow.python.lib.io import file_io

with file_io.FileIO("gs://...", 'r') as f:
    f.write("Hi!")

在检查 Keras modelCheckpoint 回调如何实现实际的文件写入后,发现它使用h5py.File() 进行 I/O:

 with h5py.File(filepath, mode='w') as f:
    f.attrs['keras_version'] = str(keras_version).encode('utf8')
    f.attrs['backend'] = K.backend().encode('utf8')
    f.attrs['model_config'] = json.dumps(
        'class_name': model.__class__.__name__,
        'config': model.get_config()
 , default=get_json_type).encode('utf8')

由于h5py packageHDF5 binary data format 的Pythonic 接口,所以据我所知h5py.File() 似乎调用了用Fortran 编写的底层HDF5 功能:source、documentation。

如何解决这个问题并让 modelCheckpoint 回调写入我的 GCS 存储桶?有没有办法让“猴子补丁”以某种方式覆盖 hdf5 文件的打开方式以使其使用 GCS 的 file_io.FileIO()

【问题讨论】:

这可能不适用于 CloudML,但您可能想要探索的一件事是 GCSFUSE 实用程序。我不知道你是否可以在 CloudML 的上下文中使用它,但我通常在运行 Ubuntu 的常规 Google Cloud VM 上运行 TF 时使用它。 Gcsfuse 允许您将 Ubuntu VM 上的本地目录映射到 Google Cloud Bucket,因此对于 Python,云存储桶开始看起来像一个常规目录。同样,不确定您是否可以将它与 CloudML 一起使用,但请考虑一下...... 谢谢@VS_FF 我会调查你的建议并给你反馈。 把这个留给仍然有同样问题的人。我能够通过创建自定义回调以在每个时期后将检查点复制到 GCS 存储桶中来解决(嗯,一种解决方法)这个问题。我已经在 *** 的另一个问题上回答了这个问题。请在这里找到它 -> ***.com/a/69226186/15319462 【参考方案1】:

一个 hacky 解决方法是保存到本地文件系统,然后使用 TF IO API 进行复制。我在 GoogleCloudPlatform ML 示例上的 Keras 示例中添加了一个示例。

基本上它会检查目标目录是否为 GCS 路径(“gs://”)并强制将 h5py 写入本地文件系统,然后使用 TF file_io API 复制到 GCS。例如:https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/census/keras/trainer/task.py#L146

【讨论】:

感谢 Jochen,GoogleCloudPlatform 的拉取请求中的代码确实解决了这个问题。如果您编辑了您的答案以包括对黑客如何工作的描述以及完整的工作代码示例,我会将其标记为已接受。 @Jochen,@Manash 指出的第 138 行中的mode='w+' 应该是mode='wb+' 吗?【参考方案2】:

这个问题可以通过以下代码解决:

# Save Keras ModelCheckpoints locally
model.save('model.h5')

# Copy model.h5 over to Google Cloud Storage
with file_io.FileIO('model.h5', mode='r') as input_f:
    with file_io.FileIO('model.h5', mode='w+') as output_f:
        output_f.write(input_f.read())
        print("Saved model.h5 to GCS")

model.h5 保存在本地文件系统中并复制到 GCS。正如Jochen 所指出的,目前还没有简单的支持将 HDF5 模型检查点写入 GCS。有了这个 hack,就可以在提供更简单的解决方案之前写入数据。

【讨论】:

【参考方案3】:

我遇到了类似的问题,上面的解决方案对我不起作用。该文件必须以二进制形式读取和写入。否则会抛出这个错误。

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x89 in position 0: invalid start byte

所以代码是

def copy_file_to_gcs(job_dir, file_path):
    with file_io.FileIO(file_path, mode='rb') as input_f:
        with file_io.FileIO(os.path.join(job_dir, file_path), mode='wb+') as output_f:
            output_f.write(input_f.read())

【讨论】:

【参考方案4】:

我在这方面可能有点晚了,但为了未来的访问者,我将描述如何从 IO 的角度调整以前在本地运行的代码以支持 GoogleML 的整个过程。

    Python 标准 open(file_name, mode) 不适用于存储桶 (gs://...../file_name)。需要from tensorflow.python.lib.io import file_io 并将所有对open(file_name, mode) 的调用更改为file_io.FileIO(file_name, mode=mode)(注意命名的mode 参数)。打开的句柄的界面是一样的。 Keras 和/或其他库大多在内部使用标准 open(file_name, mode)。也就是说,像trained_model.save(file_path) 对第三方库的调用将无法将结果存储到存储桶中。在作业成功完成后检索模型的唯一方法是将其存储在本地,然后移动到存储桶中

下面的代码效率很低,因为它一次加载整个模型,然后将其转储到存储桶中,但它适用于相对较小的模型:

model.save(file_path)

with file_io.FileIO(file_path, mode='rb') as if:
    with file_io.FileIO(os.path.join(model_dir, file_path), mode='wb+') as of:
        of.write(if.read())

模式必须设置为二进制读写。

当文件比较大时,分块读写以减少内存消耗是有意义的。

    在运行实际任务之前,我建议运行一个存根,它只是将文件保存到远程存储桶。

这个实现,暂时代替真正的train_model调用,应该这样做:

if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--job-dir',
        help='GCS location with read/write access',
        required=True
    )

    args = parser.parse_args()
    arguments = args.__dict__
    job_dir = arguments.pop('job_dir')

    with file_io.FileIO(os.path.join(job_dir, "test.txt"), mode='wb+') as of:
        of.write("Test passed.")

成功执行后,您应该会在您的存储桶中看到文件test.txt,其内容为"Test passed."

【讨论】:

使用您描述的 FileIO 方法,我扩展了 keras.callbacks.ModelCheckpoint 回调以在 GCS 中保存检查点。基于 TensorFlow 2.3。 gist.github.com/seahrh/19c8779e159da35bcdc696245a2b24f6 我们能否改为扩展 save_model 以直接在 GCS 上写入,以避免在本地写入检查点然后保存/上传到 GCS?【参考方案5】:

这是我为在每个 epoch 后保存模型而编写的代码。

import os
import numpy as np
import warnings
from keras.callbacks import ModelCheckpoint

class ModelCheckpointGC(ModelCheckpoint):
"""Taken from and modified:
https://github.com/keras-team/keras/blob/tf-keras/keras/callbacks.py
"""

def on_epoch_end(self, epoch, logs=None):
    logs = logs or 
    self.epochs_since_last_save += 1
    if self.epochs_since_last_save >= self.period:
        self.epochs_since_last_save = 0
        filepath = self.filepath.format(epoch=epoch, **logs)
        if self.save_best_only:
            current = logs.get(self.monitor)
            if current is None:
                warnings.warn('Can save best model only with %s available, '
                              'skipping.' % (self.monitor), RuntimeWarning)
            else:
                if self.monitor_op(current, self.best):
                    if self.verbose > 0:
                        print('Epoch %05d: %s improved from %0.5f to %0.5f,'
                              ' saving model to %s'
                              % (epoch, self.monitor, self.best,
                                 current, filepath))
                    self.best = current
                    if self.save_weights_only:
                        self.model.save_weights(filepath, overwrite=True)
                    else:
                        if is_development():
                            self.model.save(filepath, overwrite=True)
                        else:
                            self.model.save(filepath.split(
                                "/")[-1])
                            with file_io.FileIO(filepath.split(
                                    "/")[-1], mode='rb') as input_f:
                                with file_io.FileIO(filepath, mode='wb+') as output_f:
                                    output_f.write(input_f.read())
                else:
                    if self.verbose > 0:
                        print('Epoch %05d: %s did not improve' %
                              (epoch, self.monitor))
        else:
            if self.verbose > 0:
                print('Epoch %05d: saving model to %s' % (epoch, filepath))
            if self.save_weights_only:
                self.model.save_weights(filepath, overwrite=True)
            else:
                if is_development():
                    self.model.save(filepath, overwrite=True)
                else:
                    self.model.save(filepath.split(
                        "/")[-1])
                    with file_io.FileIO(filepath.split(
                            "/")[-1], mode='rb') as input_f:
                        with file_io.FileIO(filepath, mode='wb+') as output_f:
                            output_f.write(input_f.read())

有一个函数is_development() 可以检查它是本地环境还是 gcloud 环境。在本地环境中,我确实设置了变量LOCAL_ENV=1

def is_development():
    """check if the environment is local or in the gcloud
    created the local variable in bash profile
    export LOCAL_ENV=1

    Returns:
        [boolean] -- True if local env
    """
    try:
        if os.environ['LOCAL_ENV'] == '1':
            return True
        else:
            return False
    except:
        return False

然后就可以使用了:

 ModelCheckpointGC(
            'gs://your_bucket/models/model.h5',
            monitor='loss',
            verbose=1,
            save_best_only=True,
            mode='min'))

我希望这可以帮助某人并节省一些时间。

【讨论】:

【参考方案6】:

我不确定为什么没有提到这一点,但是有一个解决方案,您不需要在代码中添加复制功能。

按照以下步骤安装gcsfuse:

export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
sudo apt-get update
sudo apt-get install gcsfuse

然后在本地挂载你的存储桶:

mkdir bucket
gcsfuse <cloud_bucket_name> bucket

然后使用本地目录bucket/作为你模型的logdir。

云和本地目录的同步将为您自动完成,您的代码可以保持干净。

希望对你有帮助:)

【讨论】:

【参考方案7】:

对我来说最简单的方法是使用 gsutil。

model.save('model.h5')
!gsutil -m cp model.h5 gs://name-of-cloud-storage/model.h5

【讨论】:

【参考方案8】:
tf.keras.models.save_model(model, filepath, save_format="tf")

save_format:'tf'或'h5',表示将模型保存到Tensorflow SavedModel还是HDF5。在 TF 2.X 中默认为“tf”,在 TF 1.X 中默认为“h5”。

【讨论】:

以上是关于将 Keras ModelCheckpoints 保存在 Google Cloud Bucket 中的主要内容,如果未能解决你的问题,请参考以下文章

Keras 错误:无法将符号 Keras 输入/输出转换为 numpy 数组

不能将tf.keras.optimizer与tf.keras.models.sequential一起使用

将 Keras 模型集成到 TensorFlow

无法将 Keras 模型转换为 tflite

如何将numpy数组转换为keras张量

keras - 将纪元跟踪保存到文件