如何将 MapType(StringType, StringType) 的列转换为 StringType?
Posted
技术标签:
【中文标题】如何将 MapType(StringType, StringType) 的列转换为 StringType?【英文标题】:How to convert column of MapType(StringType, StringType) into StringType? 【发布时间】:2017-12-13 13:48:18 【问题描述】:所以我有这个流数据帧,我正在尝试将这个“customer_ids”列转换为一个简单的字符串。
schema = StructType()\
.add("customer_ids", MapType(StringType(), StringType()))\
.add("date", TimestampType())
original_sdf = spark.readStream.option("maxFilesPerTrigger", 800)\
.load(path=source, ftormat="parquet", schema=schema)\
.select('customer_ids', 'date')
这种转换的目的是按此列分组并按 max(date) 像这样聚合
original_sdf.groupBy('customer_ids')\
.agg(max('date')) \
.writeStream \
.trigger(once=True) \
.format("memory") \
.queryName('query') \
.outputMode("complete") \
.start()
但我遇到了这个异常
AnalysisException: u'expression `customer_ids` cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.
如何将这种流式 DataFrame 列或任何其他方式转换为 groupBy 此列?
【问题讨论】:
请提供一些输入/输出数据。 【参考方案1】:TL;DR 使用getItem
方法访问MapType
列中每个键的值。
真正的问题是您想要groupBy
的哪个键,因为MapType
列可以有多种键。每个键都可以是一列,其中包含映射列中的值。
您可以使用Column.getItem 方法(或类似的python voodoo)访问密钥:
getItem(key: Any): Colum 一个表达式,从数组中获取位于序号位置的项目,或者通过 MapType 中的键 key 获取值。
(我使用 Scala 并将其转换为 pyspark 作为家庭练习)
val ds = Seq(Map("hello" -> "world")).toDF("m")
scala> ds.show(false)
+-------------------+
|m |
+-------------------+
|Map(hello -> world)|
+-------------------+
scala> ds.select($"m".getItem("hello") as "hello").show
+-----+
|hello|
+-----+
|world|
+-----+
【讨论】:
以上是关于如何将 MapType(StringType, StringType) 的列转换为 StringType?的主要内容,如果未能解决你的问题,请参考以下文章