Tensorflow 2.3:如何并行读取大文件中的文本?

Posted

技术标签:

【中文标题】Tensorflow 2.3:如何并行读取大文件中的文本?【英文标题】:Tensorflow 2.3: How to parallelize reading text from big file? 【发布时间】:2020-08-25 11:27:40 【问题描述】:

我需要将大小为 4GB 的数据集文件分解为小块。作为优化时间消耗的一部分,我想最大化并行处理。目前,我可以观察到 CPU 和 GPU 的核心没有得到充分利用。查看图片here中的附加输出。

我的代码 sn-p 如下所示

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))


def serialize_row(text, rating):
    # Create a dictionary mapping the feature name to the tf.Example-compatible data type.
    feature = 
        'text': _bytes_feature(text),
        'rating': _float_feature(rating),
    

    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

def transform(example):
    str_example = example.decode("utf-8")
    json_example = json.loads(str_example)
    overall = json_example.get('overall', -99)
    text = json_example.get('reviewText', '')
    if type(text) is str:
        text = bytes(text, 'utf-8')
    tf_serialized_string = serialize_row(text, overall)
    return tf_serialized_string 


line_dataset = tf.data.TextLineDataset(filenames=[file_path])
line_dataset = line_dataset.map(lambda row: tf.numpy_function(transform, [row], tf.string))
line_dataset = line_dataset.shuffle(2)
line_dataset = line_dataset.batch(NUM_OF_RECORDS_PER_BATCH_FILE)
'''
    Perform batchwise transformation of the population.
'''
start = time.time()
for idx, line in line_dataset.enumerate():
    FILE_NAMES = 'test0.tfrecord'.format(idx)
    end = time.time()
    time_taken = end - start
    tf.print('Processing for file - 0'.format(FILE_NAMES))
    DIRECTORY_URL = '/home/gaurav.gupta/projects/practice/'
    filepath = os.path.join(DIRECTORY_URL, 'data-set', 'electronics', FILE_NAMES)
    batch_ds = tf.data.Dataset.from_tensor_slices(line)
    writer = tf.data.experimental.TFRecordWriter(filepath)
    writer.write(batch_ds)
    tf.print('Processing for file - 0 took 1'.format(FILE_NAMES, time_taken))
tf.print('Done')

展示执行流程的日志

Processing for file - test0.tfrecord took 14.350863218307495
Processing for file - test1.tfrecord took 12.695453882217407
Processing for file - test2.tfrecord took 12.904462575912476
Processing for file - test3.tfrecord took 12.344425439834595
Processing for file - test4.tfrecord took 11.188365697860718
Processing for file - test5.tfrecord took 11.319620609283447
Processing for file - test6.tfrecord took 11.285977840423584
Processing for file - test7.tfrecord took 11.169529438018799
Processing for file - test8.tfrecord took 11.289997816085815
Processing for file - test9.tfrecord took 11.431073188781738
Processing for file - test10.tfrecord took 11.428141593933105
Processing for file - test11.tfrecord took 3.223125457763672
Done

我尝试了num_parallel_reads 参数,但看不出有多大区别。我相信它在读取多个文件而不是单个大文件时会很方便。

我正在寻求您的建议,以并行化此任务以减少时间消耗。

【问题讨论】:

我会尝试将参数 num_parallel_calls 传递给 map 调用。即tensorflow.org/api_docs/python/tf/data/TextLineDataset#map @VladimírKunc 没有多大帮助。我测量了每次枚举所用的时间,在地图中没有 num_parallel_calls 的平均时间接近 13 秒,而在地图中没有 num_parallel_calls 的时间接近 12 秒。 很难说仅从代码中的瓶颈是什么 - 也许测量代码各个部分的时间?也许线数据集的枚举和以下处理很慢?如果是这样的话,我会尝试并行化该部分 - 似乎没有理由让它连续。我会将代码放入一个函数中,然后使用joblib.readthedocs.io/en/latest/parallel.html 在数据集枚举上并行调用它。 @VladimírKunc 这就是我想做的。我想并行运行每个循环。每个循环负责执行一个批处理,这些批处理可以并行执行。我想了解使它们并行执行的规定。 【参考方案1】:

我会尝试这样的事情(我喜欢使用joblib,因为它很容易放入现有代码中,您可能可以对许多其他框架做类似的事情,此外,joblib 不使用 GPU 也不使用使用任何 JITting):

from joblib import Parallel, delayed
from tqdm import tqdm
...

def process_file(idx, line):
  FILE_NAMES = 'test0.tfrecord'.format(idx)
  end = time.time()
  time_taken = end - start
  tf.print('Processing for file - 0'.format(FILE_NAMES))
  DIRECTORY_URL = '/home/gaurav.gupta/projects/practice/'
  filepath = os.path.join(DIRECTORY_URL, 'data-set', 'electronics', FILE_NAMES)
  batch_ds = tf.data.Dataset.from_tensor_slices(line)
  writer = tf.data.experimental.TFRecordWriter(filepath)
  writer.write(batch_ds)
  #tf.print('Processing for file - 0 took 1'.format(FILE_NAMES, time_taken))
  return FILE_NAMES, time_taken


times = Parallel(n_jobs=12, prefer="processes")(delayed(process_file)(idx, line) for idx, line in tqdm(line_dataset.enumerate(), total=len(line_dataset)))
print('Done.')

这是未经测试的代码,我也不确定它如何与 tf 代码一起使用,但我会试一试。

tqdm 完全没有必要,它只是我更喜欢使用的东西,因为它提供了一个很好的进度条。

【讨论】:

感谢您的努力。我相信这是实现并行化的有点老套的方法。张量流中应该有一些东西,因为这似乎是一个自然的要求。感谢joblib 模块的建议。

以上是关于Tensorflow 2.3:如何并行读取大文件中的文本?的主要内容,如果未能解决你的问题,请参考以下文章

TensorFlow分布式计算机制解读:以数据并行为重

如何在 DataBricks 中并行读取文件?

深度学习_1_Tensorflow_2_数据_文件读取

关于tensorflow 的数据读取线程管理QueueRunner

如何使用 gpu 并行训练 tensorflow.keras 模型? TensorFlow 版本 2.5.0

如何在 Tensorflow 2.0 + Keras 中进行并行 GPU 推理?