Spark DataFrame to Dict - 字典更新序列元素错误
Posted
技术标签:
【中文标题】Spark DataFrame to Dict - 字典更新序列元素错误【英文标题】:Spark DataFrame to Dict - dictionary update sequence element error 【发布时间】:2018-10-09 11:44:45 【问题描述】:我正在尝试使用RDD
中的collectAsMap()
函数将DataFrame
转换为Dict。
代码:
dict = df.rdd.collectAsMap()
错误日志:
ValueError: dictionary update sequence element #0 has length 8; 2 is required
更新:
DF有8个字段,是不是说collectAsMap()
只能用两个字段的DF?
【问题讨论】:
【参考方案1】:下面是在 pyspark 中做同样事情的解释。我同意拉姆的解释。 collectAsMap 仅适用于pairedrdd,因此您需要先将数据框转换为pair rdd,然后才能使用collectAsMap 函数将其转换为一些字典。
例如,我有以下数据框:
df = spark.sql("""select emp_id,emp_city from udb.temptable_1 order by emp_id""");
+------+--------+
|emp_id|emp_city|
+------+--------+
| 1|NOIDA |
| 2|GURGAON |
| 3|DWARKA |
| 4|SAKET |
| 5|USA |
| 6|UP |
| 7|NOIDA |
| 8|SAKET |
| 9|GURGAON |
+------+--------+
将其转换为键值对rdd
newrdd = df.rdd.map(lambda x : (x[0],x))
>>> type(newrdd)
<class 'pyspark.rdd.PipelinedRDD'>
[(1, Row(emp_id=1, emp_city=u'NOIDA ')),
(2, Row(emp_id=2, emp_city=u'GURGAON ')),
(3, Row(emp_id=3, emp_city=u'DWARKA ')),
(4, Row(emp_id=4, emp_city=u'SAKET ')),
(5, Row(emp_id=5, emp_city=u'USA ')),
(6, Row(emp_id=6, emp_city=u'UP ')),
(7, Row(emp_id=7, emp_city=u'NOIDA ')),
(8, Row(emp_id=8, emp_city=u'SAKET ')),
(9, Row(emp_id=9, emp_city=u'GURGAON '))]
最后,您可以使用 collectAsMap 将您的键值对 rdd 转换为 dict
dict = newrdd.collectAsMap()
1: Row(emp_id=1, emp_city=u'NOIDA '),
2: Row(emp_id=2, emp_city=u'GURGAON '),
3: Row(emp_id=3, emp_city=u'DWARKA '),
4: Row(emp_id=4, emp_city=u'SAKET '),
5: Row(emp_id=5, emp_city=u'USA '),
6: Row(emp_id=6, emp_city=u'UP '),
7: Row(emp_id=7, emp_city=u'NOIDA '),
8: Row(emp_id=8, emp_city=u'SAKET '),
9: Row(emp_id=9, emp_city=u'GURGAON ')
>>> dict.keys()
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> dict.get(2)
Row(emp_id=2, emp_city=u'GURGAON ')
【讨论】:
【参考方案2】:首先我在 python/pyspark 中表现不佳,所以我使用 scala 进行了演示...
collectAsMap
仅适用于pairedRDD(请参阅下面的代码 火花doc/代码库)
/**
* Return the key-value pairs in this RDD to the master as a Map.
*
* Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
* one value per key is preserved in the map returned)
*
* @note this method should only be used if the resulting data is expected to be small, as
* all the data is loaded into the driver's memory.
*/
def collectAsMap(): Map[K, V] = self.withScope
val data = self.collect()
val map = new mutable.HashMap[K, V]
map.sizeHint(data.length)
data.foreach pair => map.put(pair._1, pair._2)
map
您的df.rdd
等同于RDD[Row]
,因为您将DataFrame
转换为RDD
。
所以你可以*不*将它收集为地图。除非您必须对行中的任何元素执行 keyBy
(通过应用 f
在此 RDD 中创建元素的元组)。或其他一些将其转换为配对 RDD 的操作。
下面是一个完整的例子来证明这一点。
导入 org.apache.log4j.级别,记录器 导入 org.apache.spark.internal.Logging 导入 org.apache.spark.sql.SparkSession /** * * collectAsMap 仅适用于pairrdd,如果你想做一个map,那么你可以做一个rdd key by 然后继续 * * @作者:拉姆·加迪亚拉姆 */ 对象 PairedRDDPlay 扩展 Logging Logger.getLogger("org").setLevel(Level.OFF) // Logger.getLogger("akka").setLevel(Level.OFF) def main(args: Array[String]): Unit = val appName = if (args.length > 0) args(0) else this.getClass.getName val 火花:SparkSession = SparkSession.builder .config("spark.master", "local") //.config("spark.eventLog.enabled", "true") .appName(appName) .getOrCreate() 导入 spark.implicits._ val 对 = spark.sparkContext.parallelize(Array((1, 1,3), (1, 2,3), (1, 3,3), (1, 1,3), (2, 1,3) )).toDF("mycol1", "mycol2","mycol3") 对.show() val keyedBy = pair.rdd.keyBy(_.getAs[Int]("mycol1")) keyedBy.foreach(x => println("使用 keyBy-->>" + x)) val myMap = keyedBy.collectAsMap() println(myMap.toString()) 断言(myMap.size == 2) // val myMap1 = pairs.rdd.collectAsMap() // println(myMap1.toString()) // 断言(myMap1.size == 2) //Error:(28, 28) value collectAsMap is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] // val myMap1 = pairs.rdd.collectAsMap()
结果:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------+------+------+
|mycol1|mycol2|mycol3|
+------+------+------+
| 1| 1| 3|
| 1| 2| 3|
| 1| 3| 3|
| 1| 1| 3|
| 2| 1| 3|
+------+------+------+
using keyBy-->>(1,[1,1,3])
using keyBy-->>(1,[1,2,3])
using keyBy-->>(1,[1,3,3])
using keyBy-->>(1,[1,1,3])
using keyBy-->>(2,[2,1,3])
Map(2 -> [2,1,3], 1 -> [1,1,3])
问题:DF有8个字段,是不是可以使用
collectAsMap
() 只有 DF 有两个字段?
答案:否,您可以在示例中看到具有多列(即 >2)的示例。但您需要将其转换为pairrdd。
也可以看看how-does-the-collectasmap-function-work-for-spark-api
【讨论】:
它对你有帮助吗?以上是关于Spark DataFrame to Dict - 字典更新序列元素错误的主要内容,如果未能解决你的问题,请参考以下文章
Pandas使用to_dict函数将dataframe转化为字典(dict)格式数据并指定orientation参数生成不同形式的字典
Spark- How to concatenate DataFrame columns