如何将 MapType(StringType, StringType) 的列转换为 StringType?

Posted

技术标签:

【中文标题】如何将 MapType(StringType, StringType) 的列转换为 StringType?【英文标题】:How to convert column of MapType(StringType, StringType) into StringType? 【发布时间】:2017-12-13 13:48:18 【问题描述】:

所以我有这个流数据帧,我正在尝试将这个“customer_ids”列转换为一个简单的字符串。

schema = StructType()\
    .add("customer_ids", MapType(StringType(), StringType()))\
    .add("date", TimestampType())

original_sdf = spark.readStream.option("maxFilesPerTrigger", 800)\
    .load(path=source, ftormat="parquet", schema=schema)\
    .select('customer_ids', 'date')

这种转换的目的是按此列分组并按 max(date) 像这样聚合

original_sdf.groupBy('customer_ids')\
  .agg(max('date')) \
  .writeStream \
  .trigger(once=True) \
  .format("memory") \
  .queryName('query') \
  .outputMode("complete") \
  .start()

但我遇到了这个异常

AnalysisException: u'expression `customer_ids` cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.

如何将这种流式 DataFrame 列或任何其他方式转换为 groupBy 此列?

【问题讨论】:

请提供一些输入/输出数据。 【参考方案1】:

TL;DR 使用getItem 方法访问MapType 列中每个键的值。


真正的问题是您想要groupBy 的哪个键,因为MapType 列可以有多种键。每个键都可以是一列,其中包含映射列中的值。

您可以使用Column.getItem 方法(或类似的python voodoo)访问密钥:

getItem(key: Any): Colum 一个表达式,从数组中获取位于序号位置的项目,或者通过 MapType 中的键 key 获取值。

(我使用 Scala 并将其转换为 pyspark 作为家庭练习)

val ds = Seq(Map("hello" -> "world")).toDF("m")
scala> ds.show(false)
+-------------------+
|m                  |
+-------------------+
|Map(hello -> world)|
+-------------------+

scala> ds.select($"m".getItem("hello") as "hello").show
+-----+
|hello|
+-----+
|world|
+-----+

【讨论】:

以上是关于如何将 MapType(StringType, StringType) 的列转换为 StringType?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark UDAF 中实现 fastutils 映射?

如何将字符串冒号分隔的列转换为 MapType?

如何将字符串冒号分隔列转换为MapType?

如何从 Pyspark 中的 MapType 列获取键和值

如何将字符串列转换为列表

iOS MapKit更改mapview maptype会导致注释图像更改为pin?