使用Spark SQL数据帧写入方法附加MySQL表行

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark SQL数据帧写入方法附加MySQL表行相关的知识,希望对你有一定的参考价值。

我是Apache Spark SQL的新手。下面是Spark SQL应用程序的代码和查询结果。

SparkSession spark = SparkSession.builder().appName("Spark SQL Test")
                .master("local[*]").getOrCreate();      

Properties connectionProperties = new Properties();
connectionProperties.put("driver", "com.mysql.jdbc.Driver");
connectionProperties.put("url", "jdbc:mysql://localhost:3306/test");
connectionProperties.put("user", "root");
connectionProperties.put("password", "password");

Dataset<Row> jdbcDF = spark.read().jdbc(connectionProperties.getProperty("url"), "family", connectionProperties);
jdbcDF.show();
jdbcDF.printSchema();

结果是

+------+----------+--------+-------+
|EMP_ID|EMP_PASSWD|EMP_NAME|EMP_AGE|
+------+----------+--------+-------+
|  jina|       bbb|    mother|     45|
|joseph|       aaa|    father|     50|
|julian|       ccc|      son |     20|
+------+----------+--------+-------+


root
 |-- EMP_ID: string (nullable = false)
 |-- EMP_PASSWD: string (nullable = false)
 |-- EMP_NAME: string (nullable = false)
 |-- EMP_AGE: integer (nullable = false)

我尝试将另一行附加到族表中。首先我使用如下的sql命令

Dataset<Row> appendSql = spark.sql("INSERT INTO family VALUES('jane' , 'ddd', 'daughter' , 15)");

然后我可以带来我想要的结果。

+------+----------+--------+-------+
|EMP_ID|EMP_PASSWD|EMP_NAME|EMP_AGE|
+------+----------+--------+-------+
|  jane|       ddd|daughter|     15|
|  jina|       bbb|  mother|     45|
|joseph|       aaa|  father|     50|
|julian|       ccc|     son|     20|
+------+----------+--------+-------+

然后我使用如下的write方法

List<Object> appendData = Arrays.asList("julia", "eee", "grand mother", new Integer(70)); 
spark.createDataFrame(appendData, Object.class).write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"), "family", connectionProperties);

但上面的行引发了以下异常

java.sql.BatchUpdateException: Field 'EMP_ID' doesn't have a default value
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.Util.getInstance(Util.java:408)
    at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163)
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1778)
    at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262)
    at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:641)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:782)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Field 'EMP_ID' doesn't have a default value
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3909)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2527)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2680)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2484)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858)
    at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079)
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756)
    ... 15 more

我尽量不使用StructField接口,因为列的DataTypes很简单。我究竟做错了什么?

答案

它通过以下代码解决。

List<String> appendData = new ArrayList<String>();
appendData.add("julia,eee,grandmother 2,70");
appendData.add("jane,ddd,daughter ,15");

Dataset<Row> df_1 = spark.createDataset(appendData, Encoders.STRING()).toDF();
Dataset<Row> df_2 = df_1.selectExpr("split(value, ',')[0] as EMP_ID", "split(value, ',')[1] as EMP_PASSWD" , "split(value, ',')[2] as EMP_NAME", "split(value, ',')[3] as EMP_AGE");

df_2.show();

 df_2.write().mode(SaveMode.Append).jdbc(connectionProperties.getProperty("url"), "family", connectionProperties);

以上是关于使用Spark SQL数据帧写入方法附加MySQL表行的主要内容,如果未能解决你的问题,请参考以下文章

使用 phoenix 连接器将 Spark 数据帧写入 Hbase

使用 Synapse Analytics 将数据帧写入 SQL 专用数据库

使用 PySpark 写入数据帧时出错

Spark SQL:如何将新行附加到数据框表(来自另一个表)

Spark基础学习笔记25:Spark SQL数据源 - Parquet文件

--------------------- 使用SPARK SQL 统计分析,并将数据写入到mysql---------------------