无法使用 TPU 运行模型

Posted

技术标签:

【中文标题】无法使用 TPU 运行模型【英文标题】:Can not run model with TPU 【发布时间】:2022-01-02 16:28:00 【问题描述】:

我正在使用 EfficientNet 构建一个模型来预测年龄和性别。我正在使用来自 UTKFace 的图像。我正在使用 kaggle:https://www.kaggle.com/jangedoo/utkface-new

我正在使用带有加速器 TPU V3-8 的 kaggle notebook 运行 TPU

我使用的是 GPU,执行代码没有任何问题。我想使用 TPU 运行它,但出现错误。

这是我的代码:

##IMPORTS
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os
import matplotlib.pyplot as plt
import cv2
import seaborn as sns
from PIL import Image
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications import EfficientNetB0
from tensorflow.keras import models, layers
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras import utils
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.image import img_to_array
import math

##DEFINE VARIABLES
DIR_UTKFace="../input/utkface-new/UTKFace"
IMG_HEIGHT = 224
IMG_WIDTH = 224
BATCH_SIZE = 8
EPOCHS = 10

##USE TPU
try:
    tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('Running on TPU ', tpu.master())
except ValueError:
    print("Could not connect to TPU")
    tpu = None
if tpu:
    try:
        print("initializing TPU ...")
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        strategy = tf.distribute.experimental.TPUStrategy(tpu)
        print("TPU initialized")
    except _:
        print("failed to initialize TPU")


#AUTO = tf.data.experimental.AUTOTUNE
#REPLICAS = strategy.num_replicas_in_sync
#print(f'REPLICAS: REPLICAS')

##GET DATAFRAME
#For gender we get a numeric value. Male = 0, Female = 1
GENDER_MAP=['M','F']

#Get age and gender from image file name
def get_info_from_image(image_file_name):
    age=int(image_file_name.split('_')[0])
    gender=int(image_file_name.split('_')[1])
    return age, gender

#we generate a dataframe with the information we need: age, gender, file name.
all_ages = []
all_genders = []
all_file_names = []

files_names=os.listdir(DIR_UTKFace)
for file_name in files_names:

    #check that all images are jpg format
    if ".jpg" not in file_name:
        raise RuntimeError("Format not expected: "+ file_name)

    age, gender = get_info_from_image(file_name)

    all_ages.append(age)
    all_genders.append(gender)
    all_file_names.append(file_name)


dataset_images = 'age': all_ages, 
                  'gender': all_genders, 
                  'file_name': all_file_names

df = pd.DataFrame(data=dataset_images)

##SPLIT DATA
maxAge = df['age'].max()
TRAIN_TEST_SPLIT = 0.8  
TRAIN_VALID_SPLIT = 0.7
p = np.random.permutation(len(df)) 
train_up_to = int(len(df) * TRAIN_TEST_SPLIT)
train_idx = p[:train_up_to]
test_idx = p[train_up_to:]

train_up_to = int(train_up_to * TRAIN_VALID_SPLIT)
train_idx, valid_idx = train_idx[:train_up_to], train_idx[train_up_to:]

##CUSTOM DATA GENERATOR
def get_data_generator(df, indices, is_train):

    images = list()
    ages = list()
    genders = list()
    while True:
        for i in indices:
            r = df.iloc[i]
            file, age, gender = r['file_name'], r['age'], r['gender']
            img_dir=DIR_UTKFace+'/'+file
            im = Image.open(img_dir)
            im = im.resize((IMG_HEIGHT, IMG_WIDTH))
            im = np.array(im) / 255.0
            images.append(im)
            ages.append(age / maxAge) 
            genders.append(to_categorical(gender, 2))  
            if len(images) >= BATCH_SIZE:
                print(len(images))
                yield np.asarray(images), [np.asarray(ages), np.asarray(genders)]
                images = list()
                ages = list()
                genders = list()
            #discards the remainder                                               
        if len(images) > 0:
                images = list()
                ages = list()
                genders = list()


#DEFINE MODEL
efficient_net = EfficientNetB0(
    weights='imagenet',  
    include_top=False, 
    input_shape=(224,224, 3),
    pooling='max'
)


base_model = models.Sequential()
base_model.add(efficient_net)
features=base_model.output 

age_output = Dense(units=maxAge, activation="softmax", name="age_output")(features)
gender_output = Dense(units=2, activation="sigmoid", name="gender_output")(features)
model = Model(inputs=base_model.input, outputs=[age_output, gender_output])

model.compile(optimizer = "adam", 
              loss='age_output': 'mse', 'gender_output': 'categorical_crossentropy',
              metrics='age_output': 'mae', 'gender_output': 'accuracy',
              loss_weights='age_output': 2., 'gender_output': 1.)

train_gen = get_data_generator(df, train_idx, True)
valid_gen = get_data_generator(df, valid_idx, False)

def get_steps(lenght_samples, batch_size):
    if (lenght_samples % batch_size) > 0 :
        return (lenght_samples // batch_size) + 1
    else :
        return lenght_samples // batch_size

steps_train = get_steps(len(train_idx), BATCH_SIZE)
steps_valid = get_steps(len(valid_idx), BATCH_SIZE)

hist = model.fit(train_gen,
                 steps_per_epoch=steps_train,
                 epochs=EPOCHS,
                 verbose=1,
                 validation_steps=steps_valid,
                 validation_data=valid_gen
                 )

这是我得到错误的代码的最后一部分:

---------------------------------------------------------------------------
UnavailableError                          Traceback (most recent call last)
/tmp/ipykernel_43/3634844859.py in <module>
      8                  callbacks = [callbacks, early],
      9                  validation_steps=steps_valid,
---> 10                  validation_data=valid_gen
     11                  )

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_batch_size, validation_freq, max_queue_size, workers, use_multiprocessing)
   1103               logs = tmp_logs  # No error, now safe to assign to logs.
   1104               end_step = step + data_handler.step_increment
-> 1105               callbacks.on_train_batch_end(end_step, logs)
   1106               if self.stop_training:
   1107                 break

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/callbacks.py in on_train_batch_end(self, batch, logs)
    452     """
    453     if self._should_call_train_batch_hooks:
--> 454       self._call_batch_hook(ModeKeys.TRAIN, 'end', batch, logs=logs)
    455 
    456   def on_test_batch_begin(self, batch, logs=None):

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/callbacks.py in _call_batch_hook(self, mode, hook, batch, logs)
    294       self._call_batch_begin_hook(mode, batch, logs)
    295     elif hook == 'end':
--> 296       self._call_batch_end_hook(mode, batch, logs)
    297     else:
    298       raise ValueError('Unrecognized hook: '.format(hook))

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/callbacks.py in _call_batch_end_hook(self, mode, batch, logs)
    314       self._batch_times.append(batch_time)
    315 
--> 316     self._call_batch_hook_helper(hook_name, batch, logs)
    317 
    318     if len(self._batch_times) >= self._num_batches_for_timing_check:

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/callbacks.py in _call_batch_hook_helper(self, hook_name, batch, logs)
    354       hook = getattr(callback, hook_name)
    355       if getattr(callback, '_supports_tf_logs', False):
--> 356         hook(batch, logs)
    357       else:
    358         if numpy_logs is None:  # Only convert once.

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/callbacks.py in on_train_batch_end(self, batch, logs)
   1018 
   1019   def on_train_batch_end(self, batch, logs=None):
-> 1020     self._batch_update_progbar(batch, logs)
   1021 
   1022   def on_test_batch_end(self, batch, logs=None):

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/callbacks.py in _batch_update_progbar(self, batch, logs)
   1082     if self.verbose == 1:
   1083       # Only block async when verbose = 1.
-> 1084       logs = tf_utils.to_numpy_or_python_type(logs)
   1085       self.progbar.update(self.seen, list(logs.items()), finalize=False)
   1086 

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/utils/tf_utils.py in to_numpy_or_python_type(tensors)
    512     return t  # Don't turn ragged or sparse tensors to NumPy.
    513 
--> 514   return nest.map_structure(_to_single_numpy_or_python_type, tensors)
    515 
    516 

/opt/conda/lib/python3.7/site-packages/tensorflow/python/util/nest.py in map_structure(func, *structure, **kwargs)
    657 
    658   return pack_sequence_as(
--> 659       structure[0], [func(*x) for x in entries],
    660       expand_composites=expand_composites)
    661 

/opt/conda/lib/python3.7/site-packages/tensorflow/python/util/nest.py in <listcomp>(.0)
    657 
    658   return pack_sequence_as(
--> 659       structure[0], [func(*x) for x in entries],
    660       expand_composites=expand_composites)
    661 

/opt/conda/lib/python3.7/site-packages/tensorflow/python/keras/utils/tf_utils.py in _to_single_numpy_or_python_type(t)
    508   def _to_single_numpy_or_python_type(t):
    509     if isinstance(t, ops.Tensor):
--> 510       x = t.numpy()
    511       return x.item() if np.ndim(x) == 0 else x
    512     return t  # Don't turn ragged or sparse tensors to NumPy.

/opt/conda/lib/python3.7/site-packages/tensorflow/python/framework/ops.py in numpy(self)
   1069     """
   1070     # TODO(slebedev): Consider avoiding a copy for non-CPU or remote tensors.
-> 1071     maybe_arr = self._numpy()  # pylint: disable=protected-access
   1072     return maybe_arr.copy() if isinstance(maybe_arr, np.ndarray) else maybe_arr
   1073 

/opt/conda/lib/python3.7/site-packages/tensorflow/python/framework/ops.py in _numpy(self)
   1037       return self._numpy_internal()
   1038     except core._NotOkStatusException as e:  # pylint: disable=protected-access
-> 1039       six.raise_from(core._status_to_exception(e.code, e.message), None)  # pylint: disable=protected-access
   1040 
   1041   @property

/opt/conda/lib/python3.7/site-packages/six.py in raise_from(value, from_value)

UnavailableError: function_node __inference_train_function_18204 failed to connect to all addresses
Additional GRPC error information from remote target /job:localhost/replica:0/task:0/device:CPU:0:
:"created":"@1637743656.593666229","description":"Failed to pick subchannel","file":"third_party/grpc/src/core/ext/filters/client_channel/client_channel.cc","file_line":4143,"referenced_errors":["created":"@1637743656.593647082","description":"failed to connect to all addresses","file":"third_party/grpc/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":398,"grpc_status":14]
     [[node IteratorGetNext]]

我不知道发生了什么。有人知道怎么解决吗?

【问题讨论】:

这个 tpu 在哪里?代码在哪里运行? 我正在使用带有加速器 TPU V3-8 的 Kaggle notebook 运行 TPU 【参考方案1】:

截至目前,Kaggle 和 Colab 仅支持远程 TPU 设备,这会阻止 TPU 访问您的本地文件或运行自定义 Python 图像生成器代码。

新的TPU-VM 架构通过将 TPU 附加到主机 VM 来解决该问题。很快就会在 Kaggle 和 Colab 中得到支持。

同时,作为一种解决方法,您可以将数据移动到 GCS 存储桶并使用 tf.keras.preprocessing.image_dataset_from_directorytf.data.Dataset 并结合 Keras 预处理 layers。

【讨论】:

以上是关于无法使用 TPU 运行模型的主要内容,如果未能解决你的问题,请参考以下文章

谷歌TPU使用方法

无法将 tf.keras 模型正确转换为珊瑚 TPU 的量化格式

在 Colab TPU 上运行 Pytorch 堆叠模型

Colab Pro 在使用 TPU 运行时训练深度学习模型 12 小时后自动断开连接

无法从 GCP 中的 VM 访问 TPU

无法删除 Cloud TPU