Spark:将DataFrame写入Mysql

Posted lillcol

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark:将DataFrame写入Mysql相关的知识,希望对你有一定的参考价值。

Spark将DataFrame进行一些列处理后,需要将之写入mysql,下面是实现过程

1.mysql的信息

mysql的信息我保存在了外部的配置文件,这样方便后续的配置添加。

1 //配置文件示例:
2 [[email protected] tmp_lillcol]$ cat job.properties 
3 #mysql数据库配置
4 mysql.driver=com.mysql.jdbc.Driver
5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
6 mysql.username=user
7 mysql.password=123456


2.需要的jar依赖(sbt版本,maven的对应修改即可)

 1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
 4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
 8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

 

3.完整实现代码

 

  1 import java.io.FileInputStream
  2 import java.sql.{Connection, DriverManager}
  3 import java.util.Properties
  4 
  5 import org.apache.spark.sql.hive.HiveContext
  6 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
  7 import org.apache.spark.{SparkConf, SparkContext}
  8 
  9 /**
 10   * @author Administrator
 11   *         2018/10/16-10:15
 12   *
 13   */
 14 object SaveDataFrameASMysql {
 15   var hdfsPath: String = ""
 16   var proPath: String = ""
 17   var DATE: String = ""
 18 
 19   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
 20   val sc: SparkContext = new SparkContext(sparkConf)
 21   val sqlContext: SQLContext = new HiveContext(sc)
 22 
 23   def main(args: Array[String]): Unit = {
 24     hdfsPath = args(0)
 25     proPath = args(1)
 26     //不过滤读取
 27     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 28     dim_sys_city_dict.show(10)
 29 
 30     //保存mysql
 31     saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
 32   }
 33 
 34   /**
 35     * 将DataFrame保存为Mysql表
 36     *
 37     * @param dataFrame 需要保存的dataFrame
 38     * @param tableName 保存的mysql 表名
 39     * @param saveMode  保存的模式 :Append、Overwrite、ErrorIfExists、Ignore
 40     * @param proPath   配置文件的路径
 41     */
 42   def saveASMysqlTable(dataFrame: DataFrame, tableName: String, saveMode: SaveMode, proPath: String) = {
 43     var table = tableName
 44     val properties: Properties = getProPerties(proPath)
 45     val prop = new Properties //配置文件中的key 与 spark 中的 key 不同 所以 创建prop 按照spark 的格式 进行配置数据库
 46     prop.setProperty("user", properties.getProperty("mysql.username"))
 47     prop.setProperty("password", properties.getProperty("mysql.password"))
 48     prop.setProperty("driver", properties.getProperty("mysql.driver"))
 49     prop.setProperty("url", properties.getProperty("mysql.url"))
 50     if (saveMode == SaveMode.Overwrite) {
 51       var conn: Connection = null
 52       try {
 53         conn = DriverManager.getConnection(
 54           prop.getProperty("url"),
 55           prop.getProperty("user"),
 56           prop.getProperty("password")
 57         )
 58         val stmt = conn.createStatement
 59         table = table.toUpperCase
 60         stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
 61         conn.close()
 62       }
 63       catch {
 64         case e: Exception =>
 65           println("MySQL Error:")
 66           e.printStackTrace()
 67       }
 68     }
 69     dataFrame.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), table, prop)
 70   }
 71 
 72   /**
 73     * 获取 Mysql 表的数据
 74     *
 75     * @param sqlContext
 76     * @param tableName 读取Mysql表的名字
 77     * @param proPath   配置文件的路径
 78     * @return 返回 Mysql 表的 DataFrame
 79     */
 80   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String) = {
 81     val properties: Properties = getProPerties(proPath)
 82     sqlContext
 83       .read
 84       .format("jdbc")
 85       .option("url", properties.getProperty("mysql.url"))
 86       .option("driver", properties.getProperty("mysql.driver"))
 87       .option("user", properties.getProperty("mysql.username"))
 88       .option("password", properties.getProperty("mysql.password"))
 89       //        .option("dbtable", tableName.toUpperCase)
 90       .option("dbtable", tableName)
 91       .load()
 92 
 93   }
 94 
 95   /**
 96     * 获取 Mysql 表的数据 添加过滤条件
 97     *
 98     * @param sqlContext
 99     * @param table           读取Mysql表的名字
100     * @param filterCondition 过滤条件
101     * @param proPath         配置文件的路径
102     * @return 返回 Mysql 表的 DataFrame
103     */
104   def readMysqlTable(sqlContext: SQLContext, table: String, filterCondition: String, proPath: String) = {
105     val properties: Properties = getProPerties(proPath)
106     var tableName = ""
107     tableName = "(select * from " + table + " where " + filterCondition + " ) as t1"
108     sqlContext
109       .read
110       .format("jdbc")
111       .option("url", properties.getProperty("mysql.url"))
112       .option("driver", properties.getProperty("mysql.driver"))
113       .option("user", properties.getProperty("mysql.username"))
114       .option("password", properties.getProperty("mysql.password"))
115       .option("dbtable", tableName)
116       .load()
117   }
118 
119   /**
120     * 获取配置文件
121     *
122     * @param proPath
123     * @return
124     */
125   def getProPerties(proPath: String) = {
126     val properties: Properties = new Properties()
127     properties.load(new FileInputStream(proPath))
128     properties
129   }
130 }

 

 

 

4.测试

 1 def main(args: Array[String]): Unit = {
 2 hdfsPath = args(0)
 3 proPath = args(1)
 4 //不过滤读取
 5 val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 6 dim_sys_city_dict.show(10)
 7 
 8 //保存mysql
 9 saveASMysqlTable(dim_sys_city_dict, "TestMysqlTble2", SaveMode.Append, proPath)
10 }

 

5.运行结果数据敏感进行过处理

 1 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
 2 |dict_id|city_id|city_name|city_code|group_id|group_name|area_code| bureau_id|sort|bureau_name|
 3 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
 4 |     1|    249|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 21|    张三公司|
 5 |     2|    240|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 21|    张三公司|
 6 |     3|    240|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 21|    张三公司|
 7 |     4|    242|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 01|    张三公司|
 8 |     5|    246|       **|    **_ab|     100|      **按时|    **-查到|xcaasd...| 01|    张三公司|
 9 |     6|    246|       **|    **_ab|     300|      **按时|    **-查到|xcaasd...| 01|    张三公司|
10 |     7|    248|       **|    **_ab|     200|      **按时|    **-查到|xcaasd...| 01|    张三公司|
11 |     8|    242|       **|    **_ab|     400|      **按时|    **-查到|xcaasd...| 01|    张三公司|
12 |     9|    247|       **|    **_ab|     200|      **按时|    **-查到|xcaasd...| 01|    张三公司|
13 |     0|    243|       **|    **_ab|     400|      **按时|    **-查到|xcaasd...| 01|    张三公司|
14 +-------+-------+---------+---------+--------+----------+---------+--------------------+----+-----------+
15 
16 mysql> desc TestMysqlTble1;
17 +-------------+-------------+------+-----+---------+-------+
18 | Field       | Type        | Null | Key | Default | Extra |
19 +-------------+-------------+------+-----+---------+-------+
20 | dict_id     | varchar(32) | YES  |     | NULL    |       |
21 | city_id     | varchar(32) | YES  |     | NULL    |       |
22 | city_name   | varchar(32) | YES  |     | NULL    |       |
23 | city_code   | varchar(32) | YES  |     | NULL    |       |
24 | group_id    | varchar(32) | YES  |     | NULL    |       |
25 | group_name  | varchar(32) | YES  |     | NULL    |       |
26 | area_code   | varchar(32) | YES  |     | NULL    |       |
27 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
28 | sort        | varchar(32) | YES  |     | NULL    |       |
29 | bureau_name | varchar(32) | YES  |     | NULL    |       |
30 +-------------+-------------+------+-----+---------+-------+
31 10 rows in set (0.00 sec)
32 
33 mysql> desc TestMysqlTble2;
34 +-------------+------+------+-----+---------+-------+
35 | Field       | Type | Null | Key | Default | Extra |
36 +-------------+------+------+-----+---------+-------+
37 | dict_id     | text | YES  |     | NULL    |       |
38 | city_id     | text | YES  |     | NULL    |       |
39 | city_name   | text | YES  |     | NULL    |       |
40 | city_code   | text | YES  |     | NULL    |       |
41 | group_id    | text | YES  |     | NULL    |       |
42 | group_name  | text | YES  |     | NULL    |       |
43 | area_code   | text | YES  |     | NULL    |       |
44 | bureau_id   | text | YES  |     | NULL    |       |
45 | sort        | text | YES  |     | NULL    |       |
46 | bureau_name | text | YES  |     | NULL    |       |
47 +-------------+------+------+-----+---------+-------+
48 10 rows in set (0.00 sec)
49 
50 
51 mysql> select count(1) from TestMysqlTble1;
52 +----------+
53 | count(1) |
54 +----------+
55 |       21 |
56 +----------+
57 1 row in set (0.00 sec)
58 
59 mysql> select count(1) from TestMysqlTble2;
60 +----------+
61 | count(1) |
62 +----------+
63 |       21 |
64 +----------+
65 1 row in set (0.00 sec)

 

6.效率问题

一开始直接这么用的时候小数据还没什么,但是数据量大一点的时候速度就不行了,于是想方设法的想优化一下,用了几个手段效果不明显,然后进去看源代码,发现了两个关键的片段

 1  /**
 2    * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
 3    * table already exists in the external database, behavior of this function depends on the
 4    * save mode, specified by the `mode` function (default to throwing an exception).
 5    *
 6    * Don‘t create too many partitions in parallel on a large cluster; otherwise Spark might crash
 7    * your external database systems.
 8    *
 9    * @param url JDBC database url of the form `jdbc:subprotocol:subname`
10    * @param table Name of the table in the external database.
11    * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
12    *                             tag/value. Normally at least a "user" and "password" property
13    *                             should be included.
14    *
15    * @since 1.4.0
16    */
17   def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
18     val props = new Properties()
19     extraOptions.foreach { case (key, value) =>
20       props.put(key, value)
21     }
22     // connectionProperties should override settings in extraOptions
23     props.putAll(connectionProperties)
24     val conn = JdbcUtils.createConnectionFactory(url, props)()
25 
26     try {
27       var tableExists = JdbcUtils.tableExists(conn, url, table)
28 
29       if (mode == SaveMode.Ignore && tableExists) {
30         return
31       }
32 
33       if (mode == SaveMode.ErrorIfExists && tableExists) {
34         sys.error(s"Table $table already exists.")
35       }
36 
37       if (mode == SaveMode.Overwrite && tableExists) {
38         JdbcUtils.dropTable(conn, table)
39         tableExists = false
40       }
41 
42       // Create the table if the table didn‘t exist.
43       if (!tableExists) {
44         val schema = JdbcUtils.schemaString(df, url)
45         val sql = s"CREATE TABLE $table ($schema)"
46         val statement = conn.createStatement
47         try {
48           statement.executeUpdate(sql)
49         } finally {
50           statement.close()
51         }
52       }
53     } finally {
54       conn.close()
55     }
56 
57     JdbcUtils.saveTable(df, url, table, props)//-----------------------------关键点1
58   }
59 
60 
61   /**
62    * Saves the RDD to the database in a single transaction.
63    */
64   def saveTable(
65       df: DataFrame,
66       url: String,
67       table: String,
68       properties: Properties) {
69     val dialect = JdbcDialects.get(url)
70     val nullTypes: Array[Int] = df.schema.fields.map { field =>
71       getJdbcType(field.dataType, dialect).jdbcNullType
72     }
73 
74     val rddSchema = df.schema
75     val getConnection: () => Connection = createConnectionFactory(url, properties)
76     val batchSize = properties.getProperty("batchsize", "1000").toInt
77     df.foreachPartition { iterator => //------------------------------------关键点2
78       savePartition(getConnection, table, iterator, rddSchema, nullTypes, batchSize, dialect)
79     }
80   }

 

也就是说,自带的方法就是按照分区来存的,每一个分区开启一个mysql连接,所以最简单的优化方式就是在保存之前对DataFrame进行重新分区,注意数据倾斜问题,不然可能效率没有提升。
当然目前测试过最快的就是文件拿下来直接通过load data的命令导入mysql,但是这个比较麻烦。

下面是分区示例

 1 def main(args: Array[String]): Unit = {
 2     hdfsPath = args(0)
 3     proPath = args(1)
 4     //不过滤读取
 5     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
 6     dim_sys_city_dict.show(10)
 7 
 8     //保存mysql
 9     saveASMysqlTable(dim_sys_city_dict.repartition(10), "TestMysqlTble2", SaveMode.Append, proPath)
10   }

7.总结

 将DataFrame写入mysql有几点需要注意的地方:

  • 需要保存的表最好事先建好,否则字段类型会使用默认的,Text类型实在是耗资源,对比前后两张表,下面分别为源表TestMysqlTble1和DataFrame保存的mysql表TestMysqlTble2
 1 mysql> desc TestMysqlTble1;                
 2 +-------------+-------------+------+-----+---------+-------+
 3 | Field       | Type        | Null | Key | Default | Extra |
 4 +-------------+-------------+------+-----+---------+-------+
 5 | dict_id     | varchar(32) | YES  |     | NULL    |       |
 6 | city_id     | varchar(32) | YES  |     | NULL    |       |
 7 | city_name   | varchar(32) | YES  |     | NULL    |       |
 8 | city_code   | varchar(32) | YES  |     | NULL    |       |
 9 | group_id    | varchar(32) | YES  |     | NULL    |       |
10 | group_name  | varchar(32) | YES  |     | NULL    |       |
11 | area_code   | varchar(32) | YES  |     | NULL    |       |
12 | bureau_id   | varchar(64) | YES  |     | NULL    |       |
13 | sort        | varchar(32) | YES  |     | NULL    |       |
14 | bureau_name | varchar(32) | YES  |     | NULL    |       |
15 +-------------+-------------+------+-----+---------+-------+
16 10 rows in set (0.00 sec)
17 
18 mysql> desc TestMysqlTble2;
19 +-------------+------+------+-----+---------+-------+
20 | Field       | Type | Null | Key | Default | Extra |
21 +-------------+------+------+-----+---------+-------+
22 | dict_id     | text | YES  |     | NULL    |       |
23 | city_id     | text | YES  |     | NULL    |       |
24 | city_name   | text | YES  |     | NULL    |       |
25 | city_code   | text | YES  |     | NULL    |       |
26 | group_id    | text | YES  |     | NULL    |       |
27 | group_name  | text | YES  |     | NULL    |       |
28 | area_code   | text | YES  |     | NULL    |       |
29 | bureau_id   | text | YES  |     | NULL    |       |
30 | sort        | text | YES  |     | NULL    |       |
31 | bureau_name | text | YES  |     | NULL    |       |
32 +-------------+------+------+-----+---------+-------+
33 10 rows in set (0.00 sec)
  •  关于 SaveMode.Overwrite 
 1 def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
 2     val props = new Properties()
 3     extraOptions.foreach { case (key, value) =>
 4       props.put(key, value)
 5     }
 6     // connectionProperties should override settings in extraOptions
 7     props.putAll(connectionProperties)
 8     val conn = JdbcUtils.createConnectionFactory(url, props)()
 9 
10     try {
11       var tableExists = JdbcUtils.tableExists(conn, url, table)
12 
13       if (mode == SaveMode.Ignore && tableExists) {
14         return
15       }
16 
17       if (mode == SaveMode.ErrorIfExists && tableExists) {
18         sys.error(s"Table $table already exists.")
19       }
20 
21       if (mode == SaveMode.Overwrite && tableExists) {
22         JdbcUtils.dropTable(conn, table)//----------------------------------------关键点1
23         tableExists = false
24       }
25 
26       // Create the table if the table didn‘t exist.
27       if (!tableExists) {
28         val schema = JdbcUtils.schemaString(df, url)
29         val sql = s"CREATE TABLE $table ($schema)"
30         val statement = conn.createStatement
31         try {
32           statement.executeUpdate(sql)
33         } finally {
34           statement.close()
35         }
36       }
37     } finally {
38       conn.close()
39     }
40 
41     JdbcUtils.saveTable(df, url, table, props)
42   }
43 
44  /**
45    * Drops a table from the JDBC database.
46    */
47   def dropTable(conn: Connection, table: String): Unit = {
48     val statement = conn.createStatement
49     try {
50       statement.executeUpdate(s"DROP TABLE $table")//-------------------------------------关键点2
51     } finally {
52       statement.close()
53     }
54   }

 

从上述两段关键代码可以看到,在写入的时候会先判断表存不存在,SaveMode.Overwrite 的时候会执行 dropTable(conn: Connection, table: String)把原来的表删除掉,这也意味着你会失去你的表结构,新建的表会出现上一个问题都用默认类型,所以在保存的方法中我添加了下面的操作

 1 if (saveMode == SaveMode.Overwrite) {
 2  51       var conn: Connection = null
 3  52       try {
 4  53         conn = DriverManager.getConnection(
 5  54           prop.getProperty("url"),
 6  55           prop.getProperty("user"),
 7  56           prop.getProperty("password")
 8  57         )
 9  58         val stmt = conn.createStatement
10  59         table = table.toUpperCase
11  60         stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
12  61         conn.close()
13  62       }
14  63       catch {
15  64         case e: Exception =>
16  65           println("MySQL Error:")
17  66           e.printStackTrace()
18  67       }

 

truncate仅仅是删除数据,并不删除结构。

如果表一开始不存在

如果一开始不存在需要分两种情况:

1.非SaveMode.Overwrite模式

没有问题,会直接建表,用默认的数据类型

2.SaveMode.Overwrite模式

会报错,下面是在没有TestMysqlTble2的情况下使用SaveMode.Overwrite

 1 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table ‘iptv.TESTMYSQLTBLE2‘ doesn‘t exist
 2         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 3         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 4         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 5         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 6         at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
 7         at com.mysql.jdbc.Util.getInstance(Util.java:387)
 8         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)
 9         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)
10         at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)
11         at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)
12         at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)
13         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2547)
14         at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2505)
15         at com.mysql.jdbc.StatementImpl.executeInternal(StatementImpl.java:840)
16         at com.mysql.jdbc.StatementImpl.execute(StatementImpl.java:740)
17         at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
18         at com.iptv.job.basedata.SaveDataFrameASMysql$.main(SaveDataFrameASMysql.scala:33)
19         at com.iptv.job.basedata.SaveDataFrameASMysql.main(SaveDataFrameASMysql.scala)
20         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
21         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
22         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
23         at java.lang.reflect.Method.invoke(Method.java:498)
24         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
25         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
26         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
27         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
28         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

报错详情

1 at com.iptv.job.basedata.SaveDataFrameASMysql$.saveASMysqlTable(SaveDataFrameASMysql.scala:62)
2 生面报错位置对应的代码为
3 stmt.execute(s"truncate table $table") //为了不删除表结构,先truncate 再Append
4 即truncate需要表存在

 

 

至此,DataFrame写mysql功能实现

 

文章为个人工作总结,转载请注明出处!!!!!!!

 



以上是关于Spark:将DataFrame写入Mysql的主要内容,如果未能解决你的问题,请参考以下文章

scala实战之spark源码修改(能够将DataFrame按字段增量写入mysql数据表)

使用 pySpark 将 DataFrame 写入 mysql 表

将 Spark Dataframe 直接写入 HIVE 需要太多时间

spark写入mysql

我无法使用 jdbc 将 Spark DataFrame 写入数据库

Spark将DataFrame写入jdbc错误