spark将数据写入mysql的共享表
Posted
技术标签:
【中文标题】spark将数据写入mysql的共享表【英文标题】:spark writing data into share table of mysql 【发布时间】:2020-04-07 11:41:11 【问题描述】:我有一个周期性的 spark-scala 任务,用于将数据从 Hive 传输到 mysql。
table的结构可以简单的看成:
+------+------+
| id | name |
+------+------+
然后因为hive表太大,所以只好共享mysql表。
所以这是我目前的解决方案:
-
准备 MySQL 表:
mysql> show tables;
+-------------------+
| Tables_in_test_db |
+-------------------+
| shared_0 |
| shared_1 |
| shared_2 |
| shared_3 |
| shared_4 |
| shared_5 |
+-------------------+
-
从 Hive 加载数据并进行一些转换操作,然后生成我想要的数据帧
val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
.toDF("id", "name")
.withColumn("hashCode", hash($"id")%5)
-
根据
hashCode
列将数据保存到MySQL表中
(0 to 5).foreach(hashCode =>
val df = total.where($"hashCode" === hashCode).select("id", "name")
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
)
这很好用,但我是 spark 的新手,所以我想知道有没有更好的方法来实现我想要的??
更新:
这是我的完整代码:
val jdbcHostname = "localhost"
val jdbcPort = 3306
val jdbcDatabase = "test_db"
val jdbcUrl = s"jdbc:mysql://$jdbcHostname:$jdbcPort/$jdbcDatabase"
val connectionProperties = new Properties()
connectionProperties.put("user", "user")
connectionProperties.put("password", "password")
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
import spark.implicits._
val data = List((0, "a"), (11, "b"), (22, "c"), (33, "d"), (44, "e"))
val total = spark.sparkContext.parallelize(data)
.toDF("id", "name")
.withColumn("hashCode", hash($"id")%5)
(0 to 5).foreach(hashCode =>
val df = total.where($"hashCode" === hashCode).select("id", "name")
df.write
.mode(SaveMode.Append)
.jdbc(jdbcUrl, s"shared_$hashCode", connectionProperties)
)
【问题讨论】:
【参考方案1】:由于我没有看到序列化错误方面,您所做的只是通过循环进行 5 次连续的 DF 调用,但 foreach 与底层 RDD 无关。
我可以做的 cmets 是 .cache 或 .persist 可能是一种改进。此外,在总 df 上按列重新分区将允许更快的访问。
【讨论】:
什么是序列化错误??能举个例子吗?? 不,你没有序列化错误,一切都很好。只是评论。 不使用jdbc连接器,使用纯mysql连接器会更好。 dev.mysql.com/doc/connector-python/en/…以上是关于spark将数据写入mysql的共享表的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL 2.1 是不是支持将临时表或配置单元表写入 Mysql/Oracle?
Spark(StructuredStreaming)从Mysql读取数据写入Mysql表(自增ID)
Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?