Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库

Posted

技术标签:

【中文标题】Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库【英文标题】:Google Dataflow (Apache beam) JdbcIO bulk insert into mysql database 【发布时间】:2018-05-22 17:31:31 【问题描述】:

我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 将数据写入 mysql。我创建了基于Apache Beam SDK documentation 的管道,以使用数据流将数据写入mysql。它一次插入单行,因为我需要实现批量插入。我在官方文档中找不到启用批量插入模式的任何选项。

想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么。

 .apply(JdbcIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() 
        public void setParameters(KV<Integer, String> element, PreparedStatement query) 
          query.setInt(1, kv.getKey());
          query.setString(2, kv.getValue());
        
      )

【问题讨论】:

我很困惑:您包含的代码 读取 数据,而不是插入:您使用的是 JdbcIO.read()。您的意思是包含不同的代码 sn-p 吗?如果您使用 JdbcIO.write(),它会自动将写入批处理到多达 1000 个元素中(实际上最终可能会更少,具体取决于您的管道结构、运行器、数据到达率等)。跨度> 感谢您的回复@jkff。有没有办法更新批量插入的元素数量? 目前没有。对您的需求来说是太多还是太少? 对我的要求来说太少了。 嗯,你的意思是使用更大的值可以显着提高性能?我很好奇你会建议什么价值,以及它使整个管道端到端的速度有多快?您可以通过复制 JdbcIO 并对其进行编辑来尝试。 【参考方案1】:

编辑 2018-01-27:

事实证明,这个问题与 DirectRunner 有关。如果您使用 DataflowRunner 运行相同的管道,您应该获得实际上多达 1,000 条记录的批次。 DirectRunner 总是在分组操作后创建大小为 1 的包。


原答案:

我在使用 Apache Beam 的 JdbcIO 写入云数据库时遇到了同样的问题。问题是,虽然 JdbcIO 确实支持在一批中写入多达 1,000 条记录,但我实际上从未见过它一次写入超过 1 行(我不得不承认:这总是在开发环境中使用 DirectRunner)。

因此,我在 JdbcIO 中添加了一项功能,您可以通过将数据分组在一起并将每个组写入一个批次来自己控制批次的大小。下面是一个基于 Apache Beam 原始 WordCount 示例如何使用此功能的示例。

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

与 JdbcIO 的普通写入方法的不同之处在于新方法 writeIterable()PCollection&lt;Iterable&lt;RowT&gt;&gt; 作为输入而不是 PCollection&lt;RowT&gt;。每个 Iterable 都作为一批写入数据库。

可以在此处找到添加此功能的 JdbcIO 版本:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

可以在此处找到包含上述示例的整个示例项目:https://github.com/olavloite/spanner-beam-example

(Apache Beam 上还有一个拉取请求,以将其包含在项目中)

【讨论】:

你有 PR 的链接吗?

以上是关于Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库的主要内容,如果未能解决你的问题,请参考以下文章

Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库

尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错

Apache Beam/Google Dataflow - 将数据从 Google Datastore 导出到 Cloud Storage 中的文件

Scio:Apache Beam和Google Cloud Dataflow的Scala API

优化 BigQuery 资源的使用,使用 Google Dataflow 从 GCS 加载 200 万个 JSON 文件

Why Apache Beam? A data Artisans perspective