spark编程 mysql得不到数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark编程 mysql得不到数据相关的知识,希望对你有一定的参考价值。

这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。这是因为在分布式环境下,加载mysql驱动包存在一个Bug,1.3及以前的版本 --jars 分发的jar在executor端是通过 Spark自身特化的classloader加载的。而JDBC driver manager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有 executor 节点上预先装好JDBC driver并放入默认的classpath。

  不过Spark 1.4应该已经fix了这个问题,即 --jars 分发的 jar 也会纳入 YARN 的 classloader 范畴。


  今天在使用Spark中DataFrame往Mysql中插入RDD,但是一直报出以下的异常次信息:


01

[itelbog@iteblog~]$  bin/spark-submit --master local[2]

02

    --jars lib/mysql-connector-java-5.1.35.jar

03

    --class  spark.sparkToJDBC ./spark-test_2.10-1.0.jar

04

 

05

spark assembly has been built with Hive, including Datanucleus jars on classpath

06

Exception in thread "main" java.sql.SQLException: No suitable driver found for

07

jdbc:mysql://www.iteblog.com:3306/spark?user=root&password=123&useUnicode=

08

true&characterEncoding=utf8&autoReconnect=true

09

    at java.sql.DriverManager.getConnection(DriverManager.java:602)

10

    at java.sql.DriverManager.getConnection(DriverManager.java:207)

11

    at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1189)

12

    at spark.SparkToJDBC$.toMysqlFromJavaBean(SparkToJDBC.scala:20)

13

    at spark.SparkToJDBC$.main(SparkToJDBC.scala:47)

14

    at spark.SparkToJDBC.main(SparkToJDBC.scala)

15

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

16

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

17

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

18

    at java.lang.reflect.Method.invoke(Method.java:597)

19

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$

20

$runMain(SparkSubmit.scala:569)

21

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)

22

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)

23

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)

24

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

  感觉很奇怪,我在启动作业的时候加了Mysql驱动啊在,怎么会出现这种异常呢??经过查找,发现在--jars参数里面加入Mysql是没有用的。通过查找,发现提交的作业可以通过加入--driver-class-path参数来设置driver的classpath,试了一下果然没有出现错误!


1

[itelbog@iteblog ~]$  bin/spark-submit --master local[2]

2

    --driver-class-path lib/mysql-connector-java-5.1.35.jar

3

    --class  spark.SparkToJDBC ./spark-test_2.10-1.0.jar

  其实,我们还可以在spark安装包的conf/spark-env.sh通过配置SPARK_CLASSPATH来设置driver的环境变量,如下:


(这里需要注意的是,在Spark1.3版本中,在Spark配置中按如下进行配置时,运行程序时会提示该配置方法在Spark1.0之后的版本已经过时,建议使用另外两个方法;其中一个就是上面讲到的方法。另外一个就是在配置文件中配置spark.executor.extraClassPath,具体配置格式会在试验之后进行补充)


1

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/iteblog/com/mysql-connector-java-5.1.35.jar

  这样也可以解决上面出现的异常。但是,我们不能同时在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作业加上--driver-class-path参数,否则会出现以下异常:


查看源代码打印帮助

01

[itelbog@iteblog~]$  bin/spark-submit --master local[2]

02

    --driver-class-path lib/mysql-connector-java-5.1.35.jar

03

    --class  spark.SparkToJDBC ./spark-test_2.10-1.0.jar

04

 

05

Spark assembly has been built with Hive, including Datanucleus jars on classpath

06

Exception in thread "main"org.apache.spark.SparkException:

07

    Found both spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former.

08

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply

09

$7.apply(SparkConf.scala:339)

10

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply

11

$7.apply(SparkConf.scala:337)

12

    at scala.collection.immutable.List.foreach(List.scala:318)

13

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:337)

14

    at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:325)

15

    at scala.Option.foreach(Option.scala:236)

16

    at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:325)

17

    at org.apache.spark.SparkContext.<init>(SparkContext.scala:197)

18

    at spark.SparkToJDBC$.main(SparkToJDBC.scala:41)

19

    at spark.SparkToJDBC.main(SparkToJDBC.scala)

20

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

21

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

22

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

23

    at java.lang.reflect.Method.invoke(Method.java:597)

24

    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$

25

        deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)

26

    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)

27

    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)

28

    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)

29

    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


参考技术A

  “这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到Mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。

  


  编程是编定程序的中文简称,就是让计算机代码解决某个问题,对某个计算体系规定一定的运算方式,使计算体系按照该计算方式运行,并最终得到相应结果的过程。

  为了使计算机能够理解人的意图,人类就必须将需解决的问题的思路、方法和手段通过计算机能够理解的形式告诉计算机,使得计算机能够根据人的指令一步一步去工作,完成某种特定的任务。这种人和计算体系之间交流的过程就是编程。

  在计算机系统中,一条机器指令规定了计算机系统的一个特定动作。

  一个系列的计算机在硬件设计制造时就用了若干指令规定了该系列计算机能够进行的基本操作,这些指令一起构成了该系列计算机的指令系统。在计算机应用的初期,程序员使用机器的指令系统来编写计算机应用程序,这种程序称为机器语言程序。

  以上内容参考:百度百科-编程

参考技术B 这里说明一点:本文提到的解决 Spark insertIntoJDBC找不到Mysql驱动的方法是针对单机模式(也就是local模式)。在集群环境下,下面的方法是不行的。这是因为在分布式环境下,加载mysql驱动包存在一个Bug,1.3及以前的版本 --jars 分发的jar在executor端是通过 Spark自身特化的classloader加载的。而JDBC driver manager使用的则是系统默认的classloader,因此无法识别。可行的方法之一是在所有 executor 节点上预先装好JDBC driver并放入默认的classpath。
  不过Spark 1.4应该已经fix了这个问题,即 --jars 分发的 jar 也会纳入 YARN 的 classloader 范畴。

Spark 找不到窗口函数

【中文标题】Spark 找不到窗口函数【英文标题】:Spark Couldn't Find Window Function 【发布时间】:2015-10-02 10:42:00 【问题描述】:

使用https://***.com/a/32407543/5379015中提供的解决方案 我尝试重新创建相同的查询,但使用编程语法而不是 Dataframe API,如下所示:

import org.apache.spark.SparkContext, SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

object HiveContextTest 
  def main(args: Array[String]) 
    val conf = new SparkConf().setAppName("HiveContextTest")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
      ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")


    // using dataframe api works fine

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k",$"v", rowNumber().over(w).alias("rn")).show


    //using programmatic syntax doesn't work

    df.registerTempTable("df")
    val w2 = sqlContext.sql("select k,v,rowNumber() over (partition by k order by v) as rn from df")
    w2.show()

  

第一个df.select($"k",$"v", rowNumber().over(w).alias("rn")).show 工作正常,但w2.show() 结果

Exception in thread "main" org.apache.spark.sql.AnalysisException: Couldn't find window function rowNumber;

是否有人对我如何使用编程语法进行这项工作有任何想法?非常感谢。

【问题讨论】:

【参考方案1】:

rowNumber 的 SQL 等效项是 row_number

SELECT k, v, row_number() OVER (PARTITION BY k ORDER BY v) AS rn FROM df

【讨论】:

以上是关于spark编程 mysql得不到数据的主要内容,如果未能解决你的问题,请参考以下文章

编程实现利用 DataFrame 读写 MySQL 的数据

2020年寒假学习进度第九天

Spark编程题

Spark编程都有哪些有用技巧

spark浅谈:SPARK核心编程

spark入门知识讲解和基础数据操作编程(统一用scala编程实例)