使用 Apache Beam 从数据库中读取批量数据
Posted
技术标签:
【中文标题】使用 Apache Beam 从数据库中读取批量数据【英文标题】:Reading bulk data from a database using Apache Beam 【发布时间】:2018-06-07 23:14:10 【问题描述】:我想知道,如果我的查询返回数百万行,JdbcIO 将如何并行执行查询。 我已经提到了https://issues.apache.org/jira/browse/BEAM-2803 和相关的拉取请求。完全没看懂。
ReadAll
expand
方法使用ParDo
。因此,它会创建到数据库的多个连接以并行读取数据吗?如果我限制可以创建到数据源中数据库的连接数,它会坚持连接限制吗?
谁能帮我理解JdbcIO
会如何处理?我正在使用2.2.0
更新:
.apply(
ParDo.of(
new ReadFn<>(
getDataSourceConfiguration(),
getQuery(),
getParameterSetter(),
getRowMapper())))
上面的代码显示 ReadFn 是与 ParDo 一起应用的。我认为,ReadFn 将并行运行。如果我的假设是正确的,我将如何使用readAll()
方法从我一次只能建立有限数量的连接的数据库中读取?
谢谢 巴鲁
【问题讨论】:
【参考方案1】:ReadAll 方法处理您有许多多个查询的情况。您可以将查询存储为字符串的 PCollection,其中每个字符串都是查询。然后在读取时,每个项目在单个 ParDo 中作为单独的查询进行处理。
这不适用于少量查询,因为它限制了查询数量的并行性。但是如果你有很多,那么它会执行得更快。大多数 ReadAll 调用都是这种情况。
从代码看来,在 setup 函数中每个工作人员都建立了连接。这可能包括几个查询,具体取决于工作人员的数量和查询的数量。
查询限制设置在哪里?无论是否使用 ReadAll,它的行为都应该相似。
查看 jira 了解更多信息:https://issues.apache.org/jira/browse/BEAM-2706
我对 jdbcIO 不是很熟悉,但他们似乎实现了 jira 中建议的版本。 PCollection 可以是任何东西,然后是一个回调,以根据 PCollection 中的元素修改查询。这允许 PCollection 中的每个项目表示一个查询,但比将新查询作为每个元素更灵活。
【讨论】:
劳拉,感谢您的评论。但是,我的问题是针对单个查询将从数据库加载数百万行的情况。我没有看到任何并行读取此类数据的实现。我们如何在这里实现并行性?【参考方案2】:我创建了一个Datasource,如下。
ComboPooledDataSource cpds = new ComboPooledDataSource();
cpds.setDriverClass("com.mysql.jdbc.Driver"); // loads the jdbc driver
cpds.setJdbcUrl("jdbc:mysql://<IP>:3306/employees");
cpds.setUser("root");
cpds.setPassword("root");
cpds.setMaxPoolSize(5);
现在有更好的方法来设置这个驱动程序。
我将数据库池大小设置为 5。在进行JdbcIO
转换时,我使用此数据源来创建连接。
在管道中,我设置了
option.setMaxNumWorkers(5);
option.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
我使用了一个可以返回大约 300 万条记录的查询。在观察数据库连接时,程序运行时连接数逐渐增加。它在某些实例上最多使用 5 个连接。
我认为,这就是我们在运行JdbcIO
trnsformation 以从数据库加载大量数据时限制创建到数据库的连接数的方式。
ComboPoolDataSource 的 Maven 依赖项
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.1.2</version>
</dependency>
**如果我在这里遗漏了什么,请随时纠正答案。*
【讨论】:
这似乎有效。我发现 DBCP2 数据源不是 Serializable,Hikari-CP 数据源也不是,所以 c3p0 是一个不错的选择。谢谢。【参考方案3】:我有类似的任务 我从数据库中获取了记录数并将其拆分为 1000 条记录的范围 然后我将 readAll 应用于范围的 PCollection 这是解决方案的description。 并感谢 Balu reg。数据源配置。
【讨论】:
以上是关于使用 Apache Beam 从数据库中读取批量数据的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 的 Dataflow 批量加载的性能问题
从 Apache Beam 中的多个文件夹读取文件并将输出映射到文件名
Apache Beam 批量到 BigQuery,中间文件,它们是不是仅以 JSON 格式生成