从 Apache Beam 管道收集输出并将其显示到控制台

Posted

技术标签:

【中文标题】从 Apache Beam 管道收集输出并将其显示到控制台【英文标题】:Collecting output from Apache Beam pipeline and displaying it to console 【发布时间】:2018-03-06 11:52:25 【问题描述】:

我已经在 Apache Beam 上工作了几天。我想快速迭代我正在工作的应用程序,并确保我正在构建的管道没有错误。在 spark 中,我们可以使用sc.parallelise,当我们应用一些操作时,我们会得到可以检查的值。

类似地,当我阅读 Apache Beam 时,我发现我们可以创建一个 PCollection 并使用以下语法使用它

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

我实际上想将结果打印到控制台。但我找不到任何关于它的文档。

有没有办法将结果打印到控制台而不是每次都保存到文件中?

【问题讨论】:

我和这篇文章有同样的问题。我正在使用 Java,不知道如何在控制台上打印中间值。如果有人可以帮助我,将不胜感激。 【参考方案1】:

您不需要临时列表。在 python 2.7 中,以下内容就足够了:

def print_row(row):
    print row

(pipeline 
    | ...
    | "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()

在 python 3.x 中,print 是一个函数,因此以下内容就足够了:

(pipeline 
    | ...
    | "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()

【讨论】:

请注意,如果您尝试将其添加到管道的中间,您可能会从管道中收到错误 TypeError: 'NoneType' object is not subscriptable。这是因为print 返回None,它会传递给您的以下说明。在这种情况下,您将需要一些不同的代码来打印值然后返回它。【参考方案2】:

在进一步探索并了解了如何为我的应用程序编写测试用例之后,我找到了将结果打印到控制台的方法。请不要说我现在正在将所有东西都运行到单节点机器上,并试图了解 apache beam 提供的功能,以及如何在不影响行业最佳实践的情况下采用它。

所以,这是我的解决方案。在我们管道的最后阶段,我们可以引入一个 map 函数,它将结果打印到控制台或将结果累积到变量中,稍后我们可以打印变量以查看值

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output

【讨论】:

好主意,但如果您的管道以分布式方式执行,例如在 Apache Yarn (Hadoop) 或 Google Dataflow 中,这将不起作用。必须有另一种方法来收集结果。但我仍在寻找它。 当我使用 pipeline.run() 时出现此错误 - 'PBegin' 对象没有属性 'windowing' 这对于 DirectRunner 中的单元测试非常有用。【参考方案3】:

以 pycharm Edu 为例

import apache_beam as beam

class LogElements(beam.PTransform):
    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            print self.prefix + str(element)
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))

class MultiplyByTenDoFn(beam.DoFn):

    def process(self, element):
        yield element * 10

p = beam.Pipeline()

(p | beam.Create([1, 2, 3, 4, 5])
   | beam.ParDo(MultiplyByTenDoFn())
   | LogElements())

p.run()

输出

10
20
30
40
50
Out[10]: <apache_beam.runners.portability.fn_api_runner.RunnerResult at 0x7ff41418a210>

【讨论】:

【参考方案4】:

也许记录信息而不是打印?

def _logging(elem):
    logging.info(elem)
    return elem

P | "logging info" >> beam.Map(_logging)

【讨论】:

【参考方案5】:

我知道这不是您要求的,但您为什么不将其存储到文本文件中呢?它总是比通过 stdout 打印要好,而且它不是易失性的

【讨论】:

在更一般的不打印但在运行时具有可用值的情况下,我确实有一个用例(尽管我可能用错了)。在我正在处理的 Tensorflow 和 Tensorflow Transform 的上下文中,我想在使用 Beam 的变换上下文期间进行计数,然后在训练期间在操作中使用该值。因此,将计数保存在内存中比将其保存到文件并再次加载更方便。但如前所述,这不是打印。 这更像是一个评论而不是一个答案

以上是关于从 Apache Beam 管道收集输出并将其显示到控制台的主要内容,如果未能解决你的问题,请参考以下文章

Python 上的 Apache Beam 将 beam.Map 调用相乘

从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名

使用 flink runner 时如何在 apache Beam 中执行检查点?

Apache Beam 管道中的连续状态

GCP Dataflow + Apache Beam - 缓存问题

Apache Beam GCP 在动态创建的目录中上传 Avro