Spark DataFrame to Dict - 字典更新序列元素错误

Posted

技术标签:

【中文标题】Spark DataFrame to Dict - 字典更新序列元素错误【英文标题】:Spark DataFrame to Dict - dictionary update sequence element error 【发布时间】:2018-10-09 11:44:45 【问题描述】:

我正在尝试使用RDD 中的collectAsMap() 函数将DataFrame 转换为Dict。

代码:

dict = df.rdd.collectAsMap()

错误日志:

ValueError: dictionary update sequence element #0 has length 8; 2 is required

更新:

DF有8个字段,是不是说collectAsMap()只能用两个字段的DF?

【问题讨论】:

【参考方案1】:

下面是在 pyspark 中做同样事情的解释。我同意拉姆的解释。 collectAsMap 仅适用于pairedrdd,因此您需要先将数据框转换为pair rdd,然后才能使用collectAsMap 函数将其转换为一些字典。

例如,我有以下数据框:

df = spark.sql("""select emp_id,emp_city from udb.temptable_1 order by emp_id""");
+------+--------+
|emp_id|emp_city|
+------+--------+
|     1|NOIDA   |
|     2|GURGAON |
|     3|DWARKA  |
|     4|SAKET   |
|     5|USA     |
|     6|UP      |
|     7|NOIDA   |
|     8|SAKET   |
|     9|GURGAON |
+------+--------+

将其转换为键值对rdd

newrdd = df.rdd.map(lambda x : (x[0],x))

>>> type(newrdd)
<class 'pyspark.rdd.PipelinedRDD'>

[(1, Row(emp_id=1, emp_city=u'NOIDA   ')), 
(2, Row(emp_id=2, emp_city=u'GURGAON ')), 
(3, Row(emp_id=3, emp_city=u'DWARKA  ')), 
(4, Row(emp_id=4, emp_city=u'SAKET   ')), 
(5, Row(emp_id=5, emp_city=u'USA     ')), 
(6, Row(emp_id=6, emp_city=u'UP      ')), 
(7, Row(emp_id=7, emp_city=u'NOIDA   ')), 
(8, Row(emp_id=8, emp_city=u'SAKET   ')), 
(9, Row(emp_id=9, emp_city=u'GURGAON '))]

最后,您可以使用 collectAsMap 将您的键值对 rdd 转换为 dict

dict = newrdd.collectAsMap()

1: Row(emp_id=1, emp_city=u'NOIDA   '), 
2: Row(emp_id=2, emp_city=u'GURGAON '), 
3: Row(emp_id=3, emp_city=u'DWARKA  '), 
4: Row(emp_id=4, emp_city=u'SAKET   '), 
5: Row(emp_id=5, emp_city=u'USA     '), 
6: Row(emp_id=6, emp_city=u'UP      '), 
7: Row(emp_id=7, emp_city=u'NOIDA   '), 
8: Row(emp_id=8, emp_city=u'SAKET   '), 
9: Row(emp_id=9, emp_city=u'GURGAON ')

>>> dict.keys()
[1, 2, 3, 4, 5, 6, 7, 8, 9]

>>> dict.get(2)
Row(emp_id=2, emp_city=u'GURGAON ')

【讨论】:

【参考方案2】:

首先我在 python/pyspark 中表现不佳,所以我使用 scala 进行了演示...

collectAsMap 仅适用于pairedRDD(请参阅下面的代码 火花doc/代码库)

/**
       * Return the key-value pairs in this RDD to the master as a Map.
       *
       * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
       *          one value per key is preserved in the map returned)
       *
       * @note this method should only be used if the resulting data is expected to be small, as
       * all the data is loaded into the driver's memory.
       */
      def collectAsMap(): Map[K, V] = self.withScope 
        val data = self.collect()
        val map = new mutable.HashMap[K, V]
        map.sizeHint(data.length)
        data.foreach  pair => map.put(pair._1, pair._2) 
        map
      

您的df.rdd 等同于RDD[Row],因为您将DataFrame 转换为RDD

所以你可以*不*将它收集为地图。除非您必须对行中的任何元素执行 keyBy通过应用 f 在此 RDD 中创建元素的元组)。或其他一些将其转换为配对 RDD 的操作。

下面是一个完整的例子来证明这一点。


导入 org.apache.log4j.级别,记录器 导入 org.apache.spark.internal.Logging 导入 org.apache.spark.sql.SparkSession /** * * collectAsMap 仅适用于pairrdd,如果你想做一个map,那么你可以做一个rdd key by 然后继续 * * @作者:拉姆·加迪亚拉姆 */ 对象 PairedRDDPlay 扩展 Logging Logger.getLogger("org").setLevel(Level.OFF) // Logger.getLogger("akka").setLevel(Level.OFF) def main(args: Array[String]): Unit = val appName = if (args.length > 0) args(0) else this.getClass.getName val 火花:SparkSession = SparkSession.builder .config("spark.master", "local") //.config("spark.eventLog.enabled", "true") .appName(appName) .getOrCreate() 导入 spark.implicits._ val 对 = spark.sparkContext.parallelize(Array((1, 1,3), (1, 2,3), (1, 3,3), (1, 1,3), (2, 1,3) )).toDF("mycol1", "mycol2","mycol3") 对.show() val keyedBy = pair.rdd.keyBy(_.getAs[Int]("mycol1")) keyedBy.foreach(x => println("使用 keyBy-->>" + x)) val myMap = keyedBy.collectAsMap() println(myMap.toString()) 断言(myMap.size == 2) // val myMap1 = pairs.rdd.collectAsMap() // println(myMap1.toString()) // 断言(myMap1.size == 2) //Error:(28, 28) value collectAsMap is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] // val myMap1 = pairs.rdd.collectAsMap()

结果:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
+------+------+------+
|mycol1|mycol2|mycol3|
+------+------+------+
|     1|     1|     3|
|     1|     2|     3|
|     1|     3|     3|
|     1|     1|     3|
|     2|     1|     3|
+------+------+------+

using keyBy-->>(1,[1,1,3])
using keyBy-->>(1,[1,2,3])
using keyBy-->>(1,[1,3,3])
using keyBy-->>(1,[1,1,3])
using keyBy-->>(2,[2,1,3])
Map(2 -> [2,1,3], 1 -> [1,1,3])

问题:DF有8个字段,是不是可以使用collectAsMap() 只有 DF 有两个字段?


答案:否,您可以在示例中看到具有多列(即 >2)的示例。但您需要将其转换为pairrdd。

也可以看看how-does-the-collectasmap-function-work-for-spark-api

【讨论】:

它对你有帮助吗?

以上是关于Spark DataFrame to Dict - 字典更新序列元素错误的主要内容,如果未能解决你的问题,请参考以下文章

Pandas使用to_dict函数将dataframe转化为字典(dict)格式数据并指定orientation参数生成不同形式的字典

将 DataFrame 转换为 dict [重复]

Spark- How to concatenate DataFrame columns

Spark Dataframe Write to CSV 在独立集群模式下创建 _temporary 目录文件

在 spark Dataframe 中动态创建多列

spark dataframe 怎么去除第一行数据