Spark SQL 和 MySQL-SaveMode.Overwrite 不插入修改的数据
Posted
技术标签:
【中文标题】Spark SQL 和 MySQL-SaveMode.Overwrite 不插入修改的数据【英文标题】:Spark SQL and MySQL- SaveMode.Overwrite not inserting modified data 【发布时间】:2017-01-26 12:30:11 【问题描述】:我在 mysql 中有一个test
表,其 ID 和名称如下:
+----+-------+
| id | name |
+----+-------+
| 1 | Name1 |
+----+-------+
| 2 | Name2 |
+----+-------+
| 3 | Name3 |
+----+-------+
我正在使用 Spark DataFrame
读取这些数据(使用 JDBC)并像这样修改数据
Dataset<Row> modified = sparkSession.sql("select id, concat(name,' - new') as name from test");
modified.write().mode("overwrite").jdbc(AppProperties.MYSQL_CONNECTION_URL,
"test", connectionProperties);
但我的问题是,如果我提供覆盖模式,它会删除前一个表并创建一个新表,但不会插入任何数据。
我通过读取 csv 文件(与测试表相同的数据)并覆盖来尝试相同的程序。这对我有用。
我在这里遗漏了什么吗?
谢谢!
【问题讨论】:
【参考方案1】:问题出在您的代码中。因为你覆盖了一个你试图从中读取的表,所以你在 Spark 可以实际访问它之前有效地删除了所有数据。
请记住,Spark 是懒惰的。当您创建 Dataset
时,Spark 会获取所需的元数据,但不会加载数据。所以没有魔法缓存可以保留原始内容。数据将在实际需要时加载。这是当您执行write
操作时,当您开始写入时,没有更多数据要获取。
你需要的是这样的:
创建Dataset
。
应用所需的转换并将数据写入中间 MySQL 表。
TRUNCATE
原始输入和INSERT INTO ... SELECT
来自中间表或DROP
原始表和RENAME
中间表。
另一种但不太有利的方法是:
创建一个Dataset
。
应用所需的转换并将数据写入持久 Spark 表(df.write.saveAsTable(...)
或等效项)
TRUNCATE
原始输入。
读回数据并保存 (spark.table(...).write.jdbc(...)
)
放下 Spark 表。
我们再怎么强调都不为过,使用 Spark cache
/ persist
不是正确的选择。即使使用保守的StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) 缓存数据也可能会丢失(节点故障),从而导致无提示的正确性错误。
【讨论】:
【参考方案2】:我相信以上所有步骤都是不必要的。以下是您需要做的:
创建一个数据集A
,如val A = spark.read.parquet("....")
读取要更新的表,作为数据框B
。确保为数据帧B
启用了启用缓存。 val B = spark.read.jdbc("mytable").cache
在B
上强制count
- 这将根据所选的StorageLevel
- B.count
强制执行和缓存表
现在,您可以进行类似val C = A.union(B)
的转换
然后,将C
写回数据库,如C.write.mode(SaveMode.Overwrite).jdbc("mytable")
【讨论】:
对我有用(强制缓存的东西)。在我的情况下,我只有一个有一行的表,所以缓存不是问题【参考方案3】:读取和写入同一张表。
cols_df = df_2.columns
broad_cast_var = spark_context.broadcast(df_2.collect())
df_3 = sqlContext.createDataFrame(broad_cast_var.value, cols_df)
通过一些修改读取和写入同一张表。
cols_df = df_2.columns
broad_cast_var = spark_context.broadcast(df_2.collect())
def update_x(x):
y = (x[0] + 311, *x[1:])
return y
rdd_2_1 = spark_context.parallelize(broad_cast_var.value).map(update_x)
df_3 = sqlContext.createDataFrame(rdd_2_1, cols_df)
【讨论】:
以上是关于Spark SQL 和 MySQL-SaveMode.Overwrite 不插入修改的数据的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark 中使用 map() 和 filter() 而不是 spark.sql
大数据(spark sql 和 spark dataframes 连接)