如何从 PCollection 中过滤出无值
Posted
技术标签:
【中文标题】如何从 PCollection 中过滤出无值【英文标题】:How To Filter None Values Out Of PCollection 【发布时间】:2019-10-02 20:32:34 【问题描述】:我的 pubsub pull 订阅正在发送消息,并且每条消息的值都是 None 。作为管道处理的一部分,我需要找到一种方法来过滤掉 none 值
当然,防止从 pull 订阅到达 none 值的一些帮助会很好。但我觉得我错过了通过 ParDo 定义和应用函数的一般工作流程。
我已经设置了一个函数来过滤掉似乎基于打印到控制台检查工作的无值,但是当应用一个在无类型上崩溃的 lambda 函数时,我仍然收到错误。
我发现有关 python Apache Beam SDK 的文档有点稀疏,但我一直在寻找答案,但运气不佳。
from __future__ import absolute_import
import argparse
import logging
from past.builtins import unicode
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def print_row(row):
print row
print type(row)
def filter_out_nones(row):
if row is not None:
yield row
else:
print 'we found a none! get it out'
def run(argv=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
data = ['test1 message','test2 message',None,'test3 please work']
## this does seem to return only the values I would hope for based on the console log
testlogOnly = (p | "makeData" >> beam.Create(data)
| "filter" >> beam.ParDo(filter_out_nones)
| "printtesting" >> beam.Map(print_row))
# | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
## testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
# | "filterHere" >> beam.ParDo(filter_out_nones)
# | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
# | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
如果我可以记录字节字符串编码的消息而没有任何结果,我将在我需要的地方。
【问题讨论】:
我不是 100% 确定您是否想在这个问题中找出 pubsub 的无价值问题。但我还是给你加了标签。 您的注释代码执行“.with_output_types(bytes)”调用。这个字节是在哪里定义的? 【参考方案1】:您过滤掉None
值的方法对我来说看起来不错。
但是,如果我理解正确,当您使用 testlogAndWrite
并获得 AttributeError
时,您将在管道中保留 "printHere" >> beam.Map(print_row)
步骤。
print_row
读取消息并打印它们,但它不输出任何内容。因此,下一步将没有输入encode_here
。
要解决此问题,您可以注释掉该步骤或确保返回每个元素:
def print_row(row):
print row
print type(row)
return row
输出:
test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>
【讨论】:
以上是关于如何从 PCollection 中过滤出无值的主要内容,如果未能解决你的问题,请参考以下文章
如何从谷歌数据流管道中的多个输入 PCollection 生成一个输出 PCollection?
如何从 Dataflow 中的 PCollection 读取 bigQuery