如何将key,value作为spark sql中map的单独列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何将key,value作为spark sql中map的单独列相关的知识,希望对你有一定的参考价值。

我有桌子和地图。我想从该地图中制作2个单独的列 - 1.键列2.值列。 input.show();

+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
|addedSkuWithTimestamp|     fbaSKUAdditions|fbaSKURemovals|      merchantId|mfnSKUAdditions|mfnSKURemovals|removedSkuWithTimestamp|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+
| [Test1 -> 1234567...|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|   A4QA5OYD4Y45F|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
|                 null|[CLAM_SUNGLASS_CA...|            []|ANOTHER_MERCHANT|             []|          null|                   null|
+---------------------+--------------------+--------------+----------------+---------------+--------------+-----------------------+

但是我希望输出为

test1  123456789 

Test2  123456780 

如何从地图中获取2个不同的列(键列和值列)?

Dataset<Row> removed_skus = input
                    .withColumn("sku", functions.explode(input.col("removedSkuWithTimestamp")))
                    .withColumn("skuType", functions.lit("MFN"))
                    .select(input.col("merchantId").alias("merchant_id"), new Column("sku").,
                            new Column("skuType"))
                    .distinct()
                    .groupBy("merchant_id")
                    .agg(functions.collect_list("sku").alias("removedSkus"));
答案

首先让我们创建一些数据:

val df = Seq(
    (Map("sku1"->"timestamp1"), "AFN"),
    (Map("sku2"->"timestamp2"), "AFN"),
    (null, "AFN") 
).toDF("addedSkuWithTimestamp", "skuType")
.show(false)

+---------------------+-------+
|addedSkuWithTimestamp|skuType|
+---------------------+-------+
| [sku1 -> timestamp1]|    AFN|
| [sku2 -> timestamp2]|    AFN|
|                 null|    AFN|
+---------------------+-------+

这将具有以下架构:

scala> df.printSchema()

root
 |-- addedSkuWithTimestamp: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- skuType: string (nullable = true)

下一个代码将使用mapToTupleUDF udf函数从addsSkuWithTimestamp列中提取列sku_key和sku_value。 :

val mapToTupleUDF = udf((sku: Map[String, String]) => if(sku != null) sku.toSeq(0) else null)

df.withColumn("addedSkuWithTimestamp", mapToTupleUDF($"addedSkuWithTimestamp"))
  .withColumn("Sku", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._1"))
  .withColumn("Timestamp", when($"addedSkuWithTimestamp".isNotNull, $"addedSkuWithTimestamp._2"))
  .show(false)

+---------------------+-------+----+----------+
|addedSkuWithTimestamp|skuType|Sku |Timestamp |
+---------------------+-------+----+----------+
|[sku1, timestamp1]   |AFN    |sku1|timestamp1|
|[sku2, timestamp2]   |AFN    |sku2|timestamp2|
|null                 |AFN    |null|null      |
+---------------------+-------+----+----------+

请注意,只有当addedSkuWithTimestamp._1不为null时,我们才能访问addedSkuWithTimestamp

以上是关于如何将key,value作为spark sql中map的单独列的主要内容,如果未能解决你的问题,请参考以下文章

如果存储在键中的值匹配,如何在 Spark 中合并两个 RDD

c++/STL/map中怎样获取map中第i个元素

通过 Tuple2 的 key 将 Tuple2 的 value 部分作为一个 map 组合成单个 map 分组

Spark:将 JSON 文件转换为正确的格式

如何使用 Spark SQL 作为内存数据库?

Spark SQL 读取已转义双引号的 JSON 文件