具有写入 jdbc 的 Apache Beam 管道

Posted

技术标签:

【中文标题】具有写入 jdbc 的 Apache Beam 管道【英文标题】:Apache Beam pipeline with Write to jdbc 【发布时间】:2021-10-19 07:37:48 【问题描述】:

我正在尝试创建一个管道,该管道从 Pubsub 读取一些数据并写入 postgres 数据库。

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
Device = NamedTuple(
     "Device",
     [
         ("id", str),
         ("userId", str),
         ("patientId", str)
     ])
coders.registry.register_coder(Device, coders.RowCoder)
p = beam.Pipeline(options = pipeline_options)

(p
   | 'ReadFromPubSub' >> beam.io.gcp.pubsub.ReadFromPubSub(topic="projects/us-vpc/topics/pipeline").with_output_types(bytes)
   | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
   | beam.Map(lambda x:
             Device(id= "e4f63782-66f5-4f49-911f-0b00efa5b23e", userId="Random",
                             patientId=str('12345')))
     .with_output_types(Device)
   | beam.WindowInto(beam.window.FixedWindows(1))
   | 'Write to jdbc' >> WriteToJdbc(
       table_name= "patient_device",
       driver_class_name = db_driver_name,
       jdbc_url = jdbc:postgresql://localhost:5432/db_name,
       username = dev-user,
       password = db-password
       )
) 
result = p.run()
result.wait_until_finish()

我可以看到在 gcp 上部署数据流后正在创建四个步骤。

    ReadFromPubSub 解码 地图 WindowInto

但问题是未在数据流上创建“写入 jdbc”步骤。

这是执行数据流的命令:

python pipeline.py --runner DataflowRunner --project us-con-project-location --temp_location gs://staging/temp --staging_location gs://staging/temp --region us-east1 --input_topic "projects/us-vpc/topics/pipeline" --subnetwork regions/us-east1/subnetworks/-public-01

任何帮助将不胜感激!

示例如下: https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.io.jdbc.html

Apache Beam pipeline with JdbcIO

【问题讨论】:

【参考方案1】:

请注意,Dataflow 上的跨语言管道需要 Runner v2。不过,我不确定 JdbcIO 是否适用于流式传输;您可以先尝试使用批处理管道进行调试(用简单的 Create 替换您的 PubSub 读取)。

【讨论】:

我可以在每次发布后看到 pubsub 上的事件,所以这不是问题,数据流中缺少“写入 jdbc”步骤本身。 你在通过--experiments=use_runner_v2吗? 你太棒了,步骤创建。 请注意,从最新版本开始,您不再需要手动设置:issues.apache.org/jira/browse/BEAM-12590

以上是关于具有写入 jdbc 的 Apache Beam 管道的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam 处理文件

使用 Apache Beam 以 CSV 格式将 BigQuery 结果写入 GCS

Apache Beam 使用多个表时的写入次数

如何在 Apache Beam 中写入多个文件?

从 Apache Beam(GCP 数据流)写入 ConfluentCloud

Apache Beam 不会将文件写入本地环境或 Google 存储