MySQL 使用 PySpark 读取

Posted

技术标签:

【中文标题】MySQL 使用 PySpark 读取【英文标题】:MySQL read with PySpark 【发布时间】:2017-09-03 12:01:37 【问题描述】:

我有以下测试代码:

from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
print('Created spark context!')


if __name__ == '__main__':
    df = sqlContext.read.format("jdbc").options(
        url="jdbc:mysql://localhost/mysql",
        driver="com.mysql.jdbc.Driver",
        dbtable="users",
        user="user",
        password="****",
        properties="driver": 'com.mysql.jdbc.Driver'
    ).load()

    print(df)

当我运行它时,我收到以下错误:

java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

在 Scala 中,这是通过将 .jar mysql-connector-java 导入项目来解决的。

但是,在 python 中,我不知道如何告诉 pyspark 模块链接 mysql-connector 文件。

我已经看到用类似的例子解决了这个问题

spark --package=mysql-connector-java testfile.py

但我不希望这样,因为它迫使我以一种奇怪的方式运行我的脚本。我想要一个全 python 的解决方案,或者在某个地方复制一个文件,或者在路径中添加一些东西。

【问题讨论】:

【参考方案1】:

在初始化SparkConf 之前创建sparkContext 时,您可以将参数传递给spark-submit

import os
from pyspark import SparkConf, SparkContext

SUBMIT_ARGS = "--packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf()
sc = SparkContext(conf=conf)

或者您可以将它们添加到您的$SPARK_HOME/conf/spark-defaults.conf

【讨论】:

嗨,我收到了这个错误:requirement failed: Provided Maven Coordinates must be in the form 'groupId:artifactId:version'. The coordinate provided is: mysql-connector-java,所以我猜这些参数应该是另一种格式 请用这个mysql:mysql-connector-java:5.1.39更改当前包,然后它就可以工作了 你是对的,你也可以用--jars path_to/mysql-connector-java.jar将它作为一个jar加载,但它不会安装任何依赖项。我会修改答案,使其正确【参考方案2】:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Word Count")\
    .config("spark.driver.extraClassPath", "/home/tuhin/mysql.jar")\
    .getOrCreate()

dataframe_mysql = spark.read\
    .format("jdbc")\
    .option("url", "jdbc:mysql://localhost/database_name")\
    .option("driver", "com.mysql.jdbc.Driver")\
    .option("dbtable", "employees").option("user", "root")\
    .option("password", "12345678").load()

print(dataframe_mysql.columns)

“/home/tuhin/mysql.jar”是mysql jar文件的位置

【讨论】:

【参考方案3】:

如果您正在使用 pycharm 并且想要逐行运行而不是通过 spark-submit 提交您的 .py,您可以将您的 .jar 复制到 c:\spark\jars\ 并且您的代码可能是这样的:

from pyspark import SparkConf, SparkContext, sql
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
sqlContext = sql.SQLContext(sc)
source_df = sqlContext.read.format('jdbc').options(
    url='jdbc:mysql://localhost:3306/database1',
    driver='com.mysql.cj.jdbc.Driver', #com.mysql.jdbc.Driver
    dbtable='table1',
    user='root',
    password='****').load()
print (source_df)
source_df.show()

【讨论】:

以上是关于MySQL 使用 PySpark 读取的主要内容,如果未能解决你的问题,请参考以下文章

pyspark pandas 对象作为数据框 - TypeError

如何在pyspark中将GUID转换为整数

如何更改pyspark中的列元数据?

Informix JDBC PySpark 将列名中的结果作为列值读取

在 PySpark SQL 中并行执行读写 API 调用

如何有效地将 MySQL 表读入 Apache Spark/PySpark?