Spark SQL 和 MySQL-SaveMode.Overwrite 不插入修改的数据

Posted

技术标签:

【中文标题】Spark SQL 和 MySQL-SaveMode.Overwrite 不插入修改的数据【英文标题】:Spark SQL and MySQL- SaveMode.Overwrite not inserting modified data 【发布时间】:2017-01-26 12:30:11 【问题描述】:

我在 mysql 中有一个test 表,其 ID 和名称如下:

+----+-------+
| id | name  |
+----+-------+
| 1  | Name1 |
+----+-------+
| 2  | Name2 |
+----+-------+
| 3  | Name3 |
+----+-------+

我正在使用 Spark DataFrame 读取这些数据(使用 JDBC)并像这样修改数据

Dataset<Row> modified = sparkSession.sql("select id, concat(name,' - new') as name from test");
modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL,
                "test", connectionProperties);

但我的问题是,如果我提供覆盖模式,它会删除前一个表并创建一个新表,但不会插入任何数据。

我通过读取 csv 文件(与测试表相同的数据)并覆盖来尝试相同的程序。这对我有用。

我在这里遗漏了什么吗?

谢谢!

【问题讨论】:

【参考方案1】:

问题出在您的代码中。因为你覆盖了一个你试图从中读取的表,所以你在 Spark 可以实际访问它之前有效地删除了所有数据。

请记住,Spark 是懒惰的。当您创建 Dataset 时,Spark 会获取所需的元数据,但不会加载数据。所以没有魔法缓存可以保留原始内容。数据将在实际需要时加载。这是当您执行write 操作时,当您开始写入时,没有更多数据要获取。

你需要的是这样的:

创建Dataset

应用所需的转换并将数据写入中间 MySQL 表。

TRUNCATE 原始输入和INSERT INTO ... SELECT 来自中间表或DROP 原始表和RENAME 中间表。

另一种但不太有利的方法是:

创建一个Dataset。 应用所需的转换并将数据写入持久 Spark 表(df.write.saveAsTable(...) 或等效项) TRUNCATE 原始输入。 读回数据并保存 (spark.table(...).write.jdbc(...)) 放下 Spark 表。

我们再怎么强调都不为过,使用 Spark cache / persist 不是正确的选择。即使使用保守的StorageLevel (MEMORY_AND_DISK_2 / MEMORY_AND_DISK_SER_2) 缓存数据也可能会丢失(节点故障),从而导致无提示的正确性错误。

【讨论】:

【参考方案2】:

我相信以上所有步骤都是不必要的。以下是您需要做的:

创建一个数据集A,如val A = spark.read.parquet("....")

读取要更新的表,作为数据框B。确保为数据帧B 启用了启用缓存。 val B = spark.read.jdbc("mytable").cache

B 上强制count - 这将根据所选的StorageLevel - B.count 强制执行和缓存表

现在,您可以进行类似val C = A.union(B)的转换

然后,将C 写回数据库,如C.write.mode(SaveMode.Overwrite).jdbc("mytable")

【讨论】:

对我有用(强制缓存的东西)。在我的情况下,我只有一个有一行的表,所以缓存不是问题【参考方案3】:

读取和写入同一张表。

cols_df = df_2.columns

broad_cast_var = spark_context.broadcast(df_2.collect())

df_3 = sqlContext.createDataFrame(broad_cast_var.value, cols_df)

通过一些修改读取和写入同一张表。

cols_df = df_2.columns

broad_cast_var = spark_context.broadcast(df_2.collect())


def update_x(x):
    y = (x[0] + 311, *x[1:])
    return y


rdd_2_1 = spark_context.parallelize(broad_cast_var.value).map(update_x)

df_3 = sqlContext.createDataFrame(rdd_2_1, cols_df)

【讨论】:

以上是关于Spark SQL 和 MySQL-SaveMode.Overwrite 不插入修改的数据的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中使用 map() 和 filter() 而不是 spark.sql

spark streaming 和spark sql的区别

大数据(spark sql 和 spark dataframes 连接)

分别用SQL和Spark(Scala)解决50道SQL题

spark-sql 与 spark-shell REPL 中的 Spark SQL 性能差异

大数据之Spark:Spark SQL