spark将数据写入mysql的共享表

Posted

技术标签:

【中文标题】spark将数据写入mysql的共享表【英文标题】:spark writing data into share table of mysql 【发布时间】:2020-04-07 11:41:11 【问题描述】:

我有一个周期性的 spark-scala 任务,用于将数据从 Hive 传输到 mysql

table的结构可以简单的看成:

+------+------+
| id   | name |
+------+------+

然后因为hive表太大,所以只好共享mysql表。

所以这是我目前的解决方案:

    准备 MySQL 表:
mysql> show tables;
+-------------------+
| Tables_in_test_db |
+-------------------+
| shared_0          |
| shared_1          |
| shared_2          |
| shared_3          |
| shared_4          |
| shared_5          |
+-------------------+
    从 Hive 加载数据并进行一些转换操作,然后生成我想要的数据帧
val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
  .toDF("id", "name")
  .withColumn("hashCode", hash($"id")%5)
    根据hashCode列将数据保存到MySQL表中
(0 to 5).foreach(hashCode => 
  val df = total.where($"hashCode" === hashCode).select("id", "name")
  df.write
    .mode(SaveMode.Append)
    .jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
)

这很好用,但我是 spark 的新手,所以我想知道有没有更好的方法来实现我想要的??


更新:

这是我的完整代码:

val jdbcHostname = "localhost"
val jdbcPort = 3306
val jdbcDatabase = "test_db"
val jdbcUrl = s"jdbc:mysql://$jdbcHostname:$jdbcPort/$jdbcDatabase"
val connectionProperties = new Properties()
connectionProperties.put("user", "user")
connectionProperties.put("password", "password")

val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import spark.implicits._

val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
  .toDF("id", "name")
  .withColumn("hashCode", hash($"id")%5)

(0 to 5).foreach(hashCode => 
  val df = total.where($"hashCode" === hashCode).select("id", "name")
  df.write
    .mode(SaveMode.Append)
    .jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
)

【问题讨论】:

【参考方案1】:

由于我没有看到序列化错误方面,您所做的只是通过循环进行 5 次连续的 DF 调用,但 foreach 与底层 RDD 无关。

我可以做的 cmets 是 .cache 或 .persist 可能是一种改进。此外,在总 df 上按列重新分区将允许更快的访问。

【讨论】:

什么是序列化错误??能举个例子吗?? 不,你没有序列化错误,一切都很好。只是评论。 不使用jdbc连接器,使用纯mysql连接器会更好。 dev.mysql.com/doc/connector-python/en/…

以上是关于spark将数据写入mysql的共享表的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 2.1 是不是支持将临时表或配置单元表写入 Mysql/Oracle?

Spark(StructuredStreaming)从Mysql读取数据写入Mysql表(自增ID)

第八篇|Spark SQL百万级数据批量读写入MySQL

Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?

在 Apache Spark 中,用 Java 将数据帧写入 Hive 表

如何在 Spark SQL 中使用 snappy 压缩将数据写入配置单元表