如何将 spark scala map 字段合并到 BQ?
Posted
技术标签:
【中文标题】如何将 spark scala map 字段合并到 BQ?【英文标题】:How to incorporate spark scala map field to BQ? 【发布时间】:2021-02-18 17:34:57 【问题描述】:我正在编写一个 spark scala 代码来将输出写入 BQ,以下是用于形成具有两列(id 和关键字)的输出表的代码
val df1 = Seq("tamil", "telugu", "hindi").toDF("language")
val df2 = Seq(
(101, Seq("tamildiary", "tamilkeyboard", "telugumovie")),
(102, Seq("tamilmovie")),
(103, Seq("hindirhymes", "hindimovie"))
).toDF("id", "keywords")
val pattern = concat(lit("^"), df1("language"), lit(".*"))
import org.apache.spark.sql.Row
val arrayToMap = udf (arr: Seq[Row]) =>
arr.map case Row(k: String, v: Int) => (k, v) .toMap
val final_df = df2.
withColumn("keyword", explode($"keywords")).as("df2").
join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword").
groupBy("id", "language").agg(size(collect_list($"language")).as("count")).
groupBy("id").agg(arrayToMap(collect_list(struct($"language", $"count"))).as("keywords"))
final_df 的输出为:
+---+--------------------+
| id| app_language|
+---+--------------------+
|101|Map(tamil -> 2, t...|
|103| Map(hindi -> 2)|
|102| Map(tamil -> 1)|
+---+--------------------+
我正在定义以下函数来传递此输出表的架构。 (由于 BQ 不支持 map 字段,所以我使用的是结构数组。但这也不起作用)
def createTableIfNotExists(outputTable: String) =
spark.createBigQueryTable(
s"""
|CREATE TABLE IF NOT EXISTS $outputTable(
|ds date,
|id int64,
|keywords ARRAY<STRUCT<key STRING, value INT64>>
|)
|PARTITION BY ds
|CLUSTER BY user_id
""".stripMargin)
谁能帮助我为此编写一个正确的架构,以便它在 BQ 中兼容。
【问题讨论】:
【参考方案1】:您可以收集如下结构数组:
val final_df = df2
.withColumn("keyword", explode($"keywords")).as("df2")
.join(df1.as("df1"), regexp_replace($"df2.keyword", pattern, lit("")) =!= $"df2.keyword")
.groupBy("id", "language")
.agg(size(collect_list($"language")).as("count"))
.groupBy("id")
.agg(collect_list(struct($"language", $"count")).as("app_language"))
final_df.show(false)
+---+-------------------------+
|id |app_language |
+---+-------------------------+
|101|[[tamil, 2], [telugu, 1]]|
|103|[[hindi, 2]] |
|102|[[tamil, 1]] |
+---+-------------------------+
final_df.printSchema
root
|-- id: integer (nullable = false)
|-- app_language: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- language: string (nullable = true)
| | |-- count: integer (nullable = false)
然后你可以有一个类似的架构
def createTableIfNotExists(outputTable: String) =
spark.createBigQueryTable(
s"""
|CREATE TABLE IF NOT EXISTS $outputTable(
|ds date,
|id int64,
|keywords ARRAY<STRUCT<language STRING, count INT64>>
|)
|PARTITION BY ds
|CLUSTER BY user_id
""".stripMargin)
【讨论】:
以上是关于如何将 spark scala map 字段合并到 BQ?的主要内容,如果未能解决你的问题,请参考以下文章
Scala - 如何在 Spark 的 map 函数中实现 Try