Spark streaming jdbc 在数据到来时读取流 - 数据源 jdbc 不支持流式读取
Posted
技术标签:
【中文标题】Spark streaming jdbc 在数据到来时读取流 - 数据源 jdbc 不支持流式读取【英文标题】:Spark streaming jdbc read the stream as and when data comes - Data source jdbc does not support streamed reading 【发布时间】:2019-07-02 17:04:34 【问题描述】:我使用 PostGre 作为数据库。我想为每个批次捕获一个表数据并将其转换为镶木地板文件并存储到 s3。我尝试使用 spark 和 readStream 的 JDBC 选项进行连接,如下所示...
val jdbcDF = spark.readStream
.format("jdbc")
.option("url", "jdbc:postgresql://myserver:5432/mydatabase")
.option("dbtable", "database.schema.table")
.option("user", "xxxxx")
.option("password", "xxxxx")
.load()
但它抛出了不受支持的异常
Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
我在正确的轨道上吗?真的不支持数据库作为火花流的数据源吗?
AFAIK 的其他方法是编写一个 kafka 生产者将数据发布到 kafka 主题,然后使用 spark 流...
注意:我不想为此使用 kafka connect,因为我需要这样做 一些辅助转换。
这是唯一的方法吗?
这样做的正确方法是什么?有没有这样的例子? 请帮忙!
【问题讨论】:
@eliasah 你对此有什么建议吗? Sparks JDBC 调用是一个批处理操作。它不会像 Kafka Connect 那样重复运行查询 @cricket_007 我知道......当添加新数据时,它将触发查询。例如,在文件夹中添加具有相同架构的 csv 文件夹新 csv 将触发查询。我试过这种方式。我对 jdbc 的期望与此类似 JDBC 本身不知道何时添加或更改新数据...您可以为此使用 Debezium,然后在 Spark 中从 Kafka 进行转换 没有。 Debezium 只是 Kafka Connect 的一个插件 【参考方案1】:Spark 结构化流没有标准的 JDBC 源,但您可以编写自定义,但您应该了解您的表必须有一个唯一的键,您可以通过它来跟踪更改。 比如可以取my implementation,别忘了在依赖项中添加必要的JDBC驱动
【讨论】:
库是否在任何 repo 中? 不,很遗憾没有。 你能帮我理解一下,我怎样才能将此解决方案用于带有 pyspark 的项目? 嗨,@HariOm! github.com/sutugin/spark-streaming-jdbc-source/issues/… @sutugin 感谢您的回复。它对我有用,主要是我在与不兼容的版本依赖作斗争。但是正如您提到的更改默认版本以升级 spark 和 scala 版本,我尝试过,但在 sbt assemble 中失败了。所以现在我已经将我的 spark 版本降级到 2.3.2【参考方案2】:这个库可能会有所帮助:Jdbc2S。
它提供了 JDBC 流功能,并且构建在 Spark JDBC 批处理源之上。
基本上,您可以像使用任何其他流式源一样使用它,唯一的强制性配置是您正在使用的表中偏移列的名称。
【讨论】:
以上是关于Spark streaming jdbc 在数据到来时读取流 - 数据源 jdbc 不支持流式读取的主要内容,如果未能解决你的问题,请参考以下文章
为啥没有 JDBC Spark Streaming 接收器?
spark-streaming读kafka数据到hive遇到的问题
如何将数据从Kafka传递到Spark Streaming?