如何将key,value作为spark sql中map的单独列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将key,value作为spark sql中map的单独列相关的知识,希望对你有一定的参考价值。
我有桌子和地图。我想从该地图中制作2个单独的列 - 1.键列2.值列。 input.show();
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp| fbaSKUAdditions|fbaSKURemovals| merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []| A4QA5OYD4Y45F| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []|ANOTHER_MERCHANT| []| null| null|
| null|[CLAM_SUNGLASS_CA...| []|ANOTHER_MERCHANT| []| null| null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
但是我希望输出为
test1 123456789
Test2 123456780
如何从地图中获取2个不同的列(键列和值列)?
Dataset<Row> removed_skus = input
.withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
.withColumn("skuType", functions.lit("MFN"))
.select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
new Column("skuType"))
.distinct()
.groupBy("merchant_id")
.agg(functions.collect_list("sku").alias("removedSkus"));
答案
首先让我们创建一些数据:
val df = Seq(
(Map("sku1"->"timestamp1"), "AFN"),
(Map("sku2"->"timestamp2"), "AFN"),
(null, "AFN")
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)
+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]| AFN|
| [sku2 -> timestamp2]| AFN|
| null| AFN|
+---------------------+-------+
这将具有以下架构:
scala> df.printSchema()
root
|-- addedSkuWithTimestamp: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- skuType: string (nullable = true)
下一个代码将使用mapToTupleUDF
udf函数从addsSkuWithTimestamp列中提取列sku_key和sku_value。 :
val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)
df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
.withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
.withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
.show(false)
+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1] |AFN |sku1|timestamp1|
|[sku2, timestamp2] |AFN |sku2|timestamp2|
|null |AFN |null|null |
+---------------------+-------+----+----------+
请注意,只有当addedSkuWithTimestamp._1
不为null时,我们才能访问addedSkuWithTimestamp
。
以上是关于如何将key,value作为spark sql中map的单独列的主要内容,如果未能解决你的问题,请参考以下文章
如果存储在键中的值匹配,如何在 Spark 中合并两个 RDD