通过 JDBC 在受限环境中通过流式处理或批处理处理整个 SQL 表

Posted

技术标签:

【中文标题】通过 JDBC 在受限环境中通过流式处理或批处理处理整个 SQL 表【英文标题】:Processing an entire SQL table via JDBC by streaming or batching on a constrained environment 【发布时间】:2021-08-13 23:37:29 【问题描述】:

我正在尝试设置一个管道,以通过 JDBC 进行初始摄取,以逐个处理整个 SQL 表。我需要能够使用更高级别的处理能力,例如 Apache Spark 或 Flink 中可用的能力,并且希望使用任何现有的能力,而不是自己编写,尽管这可能是不可避免的。我需要能够在受限设置(可能是一台笔记本电脑)上执行此管道。请注意,这里我不是在谈论捕获或摄取 CDC,我只是想以一种不会 OOM 单台机器的方式对现有表进行批处理。

作为一个简单的例子,我在 SQL Server 中有一个 500GB 的表。我想把它分解成更小的块,适合最近现代笔记本电脑的 16GB-32GB 可用内存,对每一行应用转换函数,然后将它们转发到接收器。

一些似乎接近我需要的可用解决方案:

    Apache Spark 分区读取:
 spark.read.format("jdbc").
      .option("driver", driver)
      .option("url", url)
      .option("partitionColumn", id)
      .option("lowerBound", min)
      .option("upperBound", max)
      .option("numPartitions", 10)
      .option("fetchsize",1000)
      .option("dbtable", query)
      .option("user", "username")
      .option("password", "password")
      .load()

看起来我什至可以在初始读取后进一步repartition 数据集。 问题是,在本地执行模式下,我希望整个表在多个 CPU 内核上进行分区,这些内核都会尝试将各自的块加载到内存中,OOM 整个业务。

有没有办法限制读取作业,以便只执行尽可能多的内存可以容纳?我可以强制作业按顺序运行吗? 我是否可以将表划分为更小的块,比核心多得多,从而一次只处理少量?这不会因为无休止的任务调度等而妨碍一切吗? 如果我想编写自己的源以流式传输到 Spark,这会减轻我的记忆问题吗? this 之类的内容对我有帮助吗? Spark 的内存管理是否在这里发挥作用?为什么需要在读取的过程中一次将整个分区加载到内存中?
    我将 Apache Flink 视为替代方案,因为流模型在这里可能更合适一些。以下是它在 JDBC 方面提供的功能:
JDBCInputFormat.buildJDBCInputFormat()
     .setDrivername("com.mysql.jdbc.Driver")
     .setDBUrl("jdbc:mysql://localhost/log_db")
     .setUsername("username")
     .setPassword("password")
     .setQuery("select id, something from SOMETHING")
     .setRowTypeInfo(rowTypeInfo)
     .finish()

但是,这似乎也是为批处理而设计的,并且仍然尝试将所有内容加载到内存中。

如何让 Flink 流式传输微批量的 SQL 数据进行处理? 我是否可以编写自己的流式源来封装 JDBC 输入格式? 是否可以假设 Flink 不会发生 OOM,除非某些状态/累加器变得太大?

我还看到 Kafka 有 JDBC 连接器,但它看起来不像其他流框架那样在本地(即相同的 JVM)运行它是不可能的。谢谢大家的帮助!

【问题讨论】:

【参考方案1】:

确实,对于 Flink,输入格式仅用于批处理,但这应该不是问题。 Flink 一次批处理一个事件,无需将所有内容加载到内存中。我认为你想要的应该可以工作。

【讨论】:

以上是关于通过 JDBC 在受限环境中通过流式处理或批处理处理整个 SQL 表的主要内容,如果未能解决你的问题,请参考以下文章

在 HTML5 中通过 RTSP 或 RTP 流式传输

如何在 Django 中通过 PUT 请求处理文件上传?

自定义异常

打造自己的ChatGPT:逐字打印的流式处理

在 C# 中通过远程处理获取外部 IP 地址

kafka 虚拟机环境 单机版部署