无法从 Beam 中的 GCS 读取 PubSub gz 文件
Posted
技术标签:
【中文标题】无法从 Beam 中的 GCS 读取 PubSub gz 文件【英文标题】:Failed to ReadFromPubSub gz files from GCS in Beam 【发布时间】:2019-12-21 00:41:43 【问题描述】:我们尝试在 Beam 中以 pubsub 方式从 GCS 加载数据。一旦有新数据上传到 GCS,我们可以通过 Beam 中的 pubsub 及时加载数据。但是,它无法从 GCS 加载数据。
我的管道是
class ParseAndFilterDo(beam.DoFn):
def __int__(self):
super(ParseAndFilterDo, self).__init__()
self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')
def process(self, element):
text_line = element.strip()
data =
try:
data = json.loads(text_line)
print(data)
yield data
except Exception as ex:
print("Parse json exception:", ex)
self.num_parse_errors.inc()
...
pipeline_args.extend([
'--runner=DirectRunner',
'--staging_location=gs://my-transform-bucket/stage',
'--temp_location=gs://my-transform-bucket/temp',
'--job_name=test-sub-job',
])
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
events = p | "ReadPubSub" >> beam.io.ReadFromPubSub(topic=args.topic)
raw_events = (
events
| 'DecodeString' >> beam.Map( lambda b: b.decode('utf-8'))
| "ParseAndFilterDo" >> beam.ParDo(ParseAndFilterDo())
)
并将主题设置为GCS存储桶
gsutil notification create -t testtopic -f json -e OBJECT_FINALIZE gs://my-test-bucket
Google Cloud Pub/Sub API 也已激活。
然后我尝试将gz
文件类型的json数据上传到my-test-bucket
,日志显示
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG:urllib3.connectionpool:https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
u'kind': u'storage#object', u'contentType': u'application/x-gzip', u'name': u'log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz', u'timeCreated': u'2019-08-14T05:47:19.664Z', u'generation': u'1565761639664269', u'md5Hash': u'7mAixitzv6WDVVa1ar37Vw==', u'bucket': u'my-test-bucket', u'updated': u'2019-08-14T05:47:19.664Z', u'crc32c': u'UHiIrQ==', u'metageneration': u'1', u'mediaLink': u'https://www.googleapis.com/download/storage/v1/b/my-test-bucket/o/log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz?generation=15657616399&alt=media', u'storageClass': u'MULTI_REGIONAL', u'timeStorageClassUpdated': u'2019-08-14T05:47:19.664Z', u'etag': u'CI2V19LEAE=', u'id': u'my-test-bucket/log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz/1565761639664269', u'selfLink': u'https://www.googleapis.com/storage/v1/b/my-test-bucket/o/log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz', u'size': u'55259'
DEBUG:root:Connecting using Google Application Default Credentials.
DEBUG:root:Attempting to flush to all destinations. Total buffered: 0
这里似乎只触发了storage object
事件。但是在 Beam 中没有要读取的数据负载。
我的配置有什么问题还是我遗漏了什么?
梁版本:2.14.0 google-cloud-pubsub: 0.45.0 grpcio:1.22.0【问题讨论】:
【参考方案1】:Pub/Sub notifications 将仅包含事件元数据(上传的对象不通过 Pub/Sub 消息发送)。
如果我正确理解了用例并且您想读取文件内容,则需要解析通知以获取完整的文件路径,然后将生成的 PCollection 传递给beam.io.ReadAllFromText()
,如下所示:
class ExtractFn(beam.DoFn):
def process(self, element):
file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
logging.info('File: ' + file_name)
yield file_name
请注意,我使用了您提供的示例消息的 id
字段(并删除了我猜是用于版本控制的最后一部分)。
我的主要管道是:
(p
| 'Read Messages' >> beam.io.ReadFromPubSub(topic="projects/PROJECT/topics/TOPIC")
| 'Convert Message to JSON' >> beam.Map(lambda message: json.loads(message))
| 'Extract File Names' >> beam.ParDo(ExtractFn())
| 'Read Files' >> beam.io.ReadAllFromText()
| 'Write Results' >> beam.ParDo(LogFn()))
完整代码here.
我用direct runner和2.14.0 SDK、公共文件gs://apache-beam-samples/shakespeare/kinglear.txt
和测试消息(不是真正的通知)进行了测试:
python notifications.py --streaming
gcloud pubsub topics publish $TOPIC_NAME --message='"id": "apache-beam-samples/shakespeare/kinglear.txt/1565795872"'
开始印刷莎士比亚的李尔王:
INFO:root:File: gs://apache-beam-samples/shakespeare/kinglear.txt
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
...
INFO:root: KING LEAR
INFO:root:
INFO:root:
INFO:root: DRAMATIS PERSONAE
INFO:root:
INFO:root:
INFO:root:LEAR king of Britain (KING LEAR:)
INFO:root:
INFO:root:KING OF FRANCE:
【讨论】:
非常感谢您的回复,我误解了 GCS 的 pub/sub。 我想知道为什么我从ExtractFn
返回yield file_name
可能会使ReadAllFromText
失败,而在class LogFn(beam.DoFn): def process(self, element): logging.info(element) return [element]
的方式中。更多详情请参考this以上是关于无法从 Beam 中的 GCS 读取 PubSub gz 文件的主要内容,如果未能解决你的问题,请参考以下文章
apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python
PubSub 到 BigQuery - Python 中的数据流/Beam 模板?