tensorflow数据流水线如何分离?

Posted

技术标签:

【中文标题】tensorflow数据流水线如何分离?【英文标题】:How to separate the tensorflow data pipeline? 【发布时间】:2022-01-17 10:13:27 【问题描述】:

我想将 TensorFlow 管道一分为二,并使用 tf.data.Dataset.map() 对每个管道应用不同的函数。 像这样:

dataset = tf.data.Dataset.from_tensor_slices(list(range(20)))
dataset = dataset.shuffle(20).batch(10)

dataset_1 = dataset.map(lambda x: x)
dataset_2 = dataset.map(lambda x: x + 1)

for d1, d2 in zip(dataset_1, dataset_2):
    print(d1.numpy())  # [13 14 12 15 18  2 16 19 6 4]
    print(d2.numpy())  # [18 16  6  7  3 15 17  9 2 4]

    break

但是,这不是我想要的输出。我的期望是当d1[13 14 12 15 18 2 16 19 6 4] 时,d2 应该是[14 15 13 16 19 3 17 20 7 5]。我想我知道发生了什么,但不知道如何写。 我不想从一开始就创建两条管道(因为开销很大)。你能给我一些建议吗? 感谢阅读。

更新

我决定如下实现。

# use the same seed for dataset_1 and dataset_2
dataset_1 = dataset.shuffle(20, seed=0).batch(10)
dataset_2 = dataset.shuffle(20, seed=0).batch(10)

dataset_1 = dataset_1.map(lambda x: x)
dataset_2 = dataset_2.map(lambda x: x + 1)

【问题讨论】:

【参考方案1】:

两个动作的简单堆栈怎么样

dataset = tf.data.Dataset.from_tensor_slices(list(range(20)))
dataset = dataset.shuffle(20)

def func1(x):
    return x

def func2(x):
    return x + 1

dataset = dataset.map(lambda sample: tf.stack([func1(sample), func2(sample)], axis=0))

list(dataset.as_numpy_iterator())

# [array([ 9, 10], dtype=int32),
#  array([16, 17], dtype=int32),
#  array([10, 11], dtype=int32),
#  array([1, 2], dtype=int32),
#  array([11, 12], dtype=int32),
#  array([6, 7], dtype=int32),
#  array([18, 19], dtype=int32),
#  array([3, 4], dtype=int32),
#  array([8, 9], dtype=int32),
#  array([15, 16], dtype=int32),
#  array([4, 5], dtype=int32),
#  array([14, 15], dtype=int32),
#  array([0, 1], dtype=int32),
#  array([12, 13], dtype=int32),
#  array([17, 18], dtype=int32),
#  array([2, 3], dtype=int32),
#  array([5, 6], dtype=int32),
#  array([13, 14], dtype=int32),
#  array([7, 8], dtype=int32),
#  array([19, 20], dtype=int32)]

之后,您可以取消批处理 dataset = dataset.unbatch() 和批处理一样 dataset = dataset.batch(10) 如果需要。

【讨论】:

【参考方案2】:

tensorflow shuffle 函数的默认行为是在您每次调用 .numpy() 时重新洗牌,为了防止这种情况发生,您需要设置 reshuffle_each_itertaion=False (https://www.tensorflow.org/api_docs/python/tf/data/Dataset#shuffle)。

dataset = tf.data.Dataset.from_tensor_slices(list(range(20)))
dataset = dataset.shuffle(20, reshuffle_each_iteration=False).batch(10)
dataset_1 = dataset.map(lambda x: x)
dataset_2 = dataset.map(lambda x: x + 1)

for d1, d2 in zip(dataset_1, dataset_2):
    print(d1.numpy())  # [10 13  3 19 12 16  7 11  2  8]
    print(d2.numpy())  # [11 14  4 20 13 17  8 12  3  9]

    break

但是这样做的后果是,如果您尝试调用 d1.numpy() 或 d2.numpy() 第二次,值将保持不变。

【讨论】:

正如你所提到的,我不想在第二个循环中获得相同的值。所以我决定按照我上面添加的方式实现它。感谢您的回答!

以上是关于tensorflow数据流水线如何分离?的主要内容,如果未能解决你的问题,请参考以下文章

TensorFlow:如何启动 Profiler

tensorflow基础-数据类型

TensorFlow:在PyCharm中配置TensorFlow

TensorFlow 的 ./configure 在哪里以及如何启用 GPU 支持?

生产环境中的 TensorFlow:如何重新训练您的模型

Tensorflow:您如何在模型训练期间实时监控 GPU 性能?