将 Spark 与 Flask 与 JDBC 一起使用

Posted

技术标签:

【中文标题】将 Spark 与 Flask 与 JDBC 一起使用【英文标题】:Using Spark with Flask with JDBC 【发布时间】:2019-01-16 16:53:06 【问题描述】:

我在做什么?

我想使用 Flask 构建一个 API 服务来从一个数据库中提取数据,进行一些数据分析,然后将新数据加载到一个单独的数据库中。

怎么了?

如果我自己运行 Spark,我可以访问 db,执行分析并加载到 db。但是在 Flask 应用程序(api 路由)中使用它们时,相同的功能将不起作用。

我是怎么做到的?

首先我启动 Spark master 和 worker。我可以看到我在 localhost:8080 有一个工人在 master 下。

export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

../sbin/start-master.sh
../sbin/start-slave.sh spark://xxx.local:7077

对于 Flask 应用程序:

app = Flask(__name__)

spark = SparkSession\
    .builder\
    .appName("Flark - Flask on Spark")\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


@app.route("/")
def hello():
    dataframe = spark.read.format("jdbc").options(
        url="jdbc:postgresql://localhost/foodnome_dev?user=postgres&password=''",
        database="foodnome_test",
        dbtable='"Dishes"'
    ).load()

    print([row["description"]
           for row in dataframe.select('description').collect()])

    return "hello"

为了运行这个应用程序,我使用带有spark-submit的JDBC驱动:

../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar server.py

我会遇到什么错误?

在 Flask 方面,错误是内部服务器错误。 在 Spark 方面,

File "/Users/leoqiu/Desktop/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o36.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.67, executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:55)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)

【问题讨论】:

【参考方案1】:

--driver-class-path 在这里是不够的。驱动程序 jar 也应该添加到执行程序类路径中。这通常使用以下方法一起处理:

spark.jars.packages / --packages spark.jars / --jars

虽然你仍然可以使用spark.executor.extraClassPath

解释

带有 JDBC 的源驱动程序负责读取元数据(模式)和执行器用于实际的数据检索过程。

这种行为对于不同的外部数据源是常见的,所以每当你使用非内置格式时,你应该在集群中分发相应的jar。

另请参阅

How to use JDBC source to write and read data in (Py)Spark?

【讨论】:

【参考方案2】:

按照建议,这对我有用。它需要--jars

../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar --jars postgresql-42.2.5.jar server.py

【讨论】:

以上是关于将 Spark 与 Flask 与 JDBC 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

是否可以使用 spark 的 jdbc 驱动程序将 apache spark 与 jasper 集成?

是否可以将 Flask RestX 与 Flask 2.0+ 异步等待一起使用?

Python / Flask - 将 flask_restless 与 flask_httpauth 一起使用

将 Swagger 与 Flask 一起使用 [关闭]

如何将 PyMongo 与 Flask 蓝图一起使用?

如何将 flask.url_for() 与 flask-restful 一起使用?