使用 pyspark 从 RDS MySQL 数据库中提取数据

Posted

技术标签:

【中文标题】使用 pyspark 从 RDS MySQL 数据库中提取数据【英文标题】:Pull data from RDS MySQL db using pyspark 【发布时间】:2018-01-02 21:31:08 【问题描述】:

我是第一次使用 pyspark。我正在尝试使用以下代码从 RDS mysql 数据库中提取数据。我参考了以下链接 pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver, https://www.supergloo.com/fieldnotes/spark-sql-mysql-python-example-jdbc/ 等等。但没有运气。

from  pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
sqlContext = SQLContext(spark)

hostname='abc.rds.amazonaws.com'
jdbcPort=3306
dbname='mydb'
username='user'
password='password'

jdbc_url = "jdbc:mysql://0:1/2".format(hostname, jdbcPort, dbname)


connectionProperties = 
  "user" : username,
  "password" : password


    df=spark.read.jdbc(url=jdbc_url, table='test', properties= connectionProperties)
    df.show()

但我收到以下错误:

    ---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-16-319dff08eefb> in <module>()
     21 #pushdown_query = "(select * from gwdd_data) test"
     22 #df = spark.read.jdbc(url=url,table=pushdown_query, properties=connectionProperties)
---> 23 df=spark.read.jdbc(url=jdbc_url, table='test', properties= connectionProperties)
     24 df.limit(10).show()

~\opt\spark\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py in jdbc(self, url, table, column, lowerBound, upperBound, numPartitions, predicates, properties)
    438             jpredicates = utils.toJArray(gateway, gateway.jvm.java.lang.String, predicates)
    439             return self._df(self._jreader.jdbc(url, table, jpredicates, jprop))
--> 440         return self._df(self._jreader.jdbc(url, table, jprop))
    441 
    442 

~\opt\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

~\opt\spark\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~\opt\spark\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling 012.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o326.jdbc.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.hive.execution.HiveFileFormat not found
    at java.util.ServiceLoader.fail(ServiceLoader.java:239)
    at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:372)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:550)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
    at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:166)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

我不知道如何解决这个错误。我检查了 mysql-conenctor-java-5.1.45-bin.jar 是否存在于 SPARK_HOME/jars 中。

我尝试使用 SQLCONtext 实现上面的代码,也出现错误。

谁能帮我解决这个问题?

谢谢

【问题讨论】:

【参考方案1】:

您的错误是jdbc 连接的参数错误。参数dbtable 不存在。先来看看jdbcconnector for spark的用法

然后你需要正确连接,你要这样做:

my_df = spark.read.jdbc(url=jdbc_url, table='gwdd_data', properties= connectionProperties)
my_df.limit(10).show()

这应该适合你。

【讨论】:

感谢您的纠正。我实现了这一点,但现在我遇到了一些其他错误。我已经更新了问题。请看一看。

以上是关于使用 pyspark 从 RDS MySQL 数据库中提取数据的主要内容,如果未能解决你的问题,请参考以下文章

将 Pyspark 数据帧加载到 postgres RDS 中的表中时出错

如何使用带有 PySpark 的 WHERE 子句在 AWS Glue 中查询 JDBC 数据库?

将Mysql数据从一个账户的RDS自动复制到另一个账户的RDS

Flink SQL 在执行 Select 时内存不足 - 从 RDS 插入到 Mysql

使用 node-mysql 从 heroku 节点应用程序连接到 Amazon RDS 数据库

从阿里云RDS MySQL在线迁移数据到本地MySQL