想知道为啥空的内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化的异常

Posted

技术标签:

【中文标题】想知道为啥空的内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化的异常【英文标题】:wondering why empty inner iterator causes not serializable exception with mapPartitionsWithIndex想知道为什么空的内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化的异常 【发布时间】:2018-12-03 13:37:44 【问题描述】:

我一直在尝试使用 Spark 的 mapPartitionsWithIndex,但遇到问题 试图返回本身包含空迭代器的元组的迭代器。

我尝试了几种不同的方法来构建内部迭代器 [通过 Iterator() 和 List(...).iterator ],并且 所有的道路让我得到这个错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
        - object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: empty iterator)
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (1,empty iterator))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 1)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)

我的代码示例如下。请注意,给定它运行正常(一个空的迭代器作为 mapPartitionsWithIndex 值。)但是当您使用现在已注释掉的版本运行时 mapPartitionsWithIndex 调用你会得到上面的错误。

如果有人对如何使它起作用有任何建议,我将不胜感激。

import org.apache.spark.Partition, SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object ANonWorkingExample extends App 
  val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, Iterator[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex 
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          // Iterator((partitionIndex, mappedInput)) // FAILS
          Iterator()   // no exception.. but not really what i want.

      

  val data = partRDD.collect
  println("data:" + data.toList);

【问题讨论】:

不是我对使用 mapPartitions 的理解。从未见过“ .. val 部分:Array[Partition] = parallel.partitions ... 以这种方式使用,但学习一些。更是如此apachesparkbook.blogspot.com/2015/11/mappartition-example.html 或调用函数。但也许我正在学习一些新东西...... 我喜欢 SBT BTW。 好奇你的回复 【参考方案1】:

我不确定您要达到什么目标,与这里的一些专家相比,我是一个新手。

我介绍的一些东西可能会让您了解如何正确地做我认为的事情并制作一些 cmets:

    您似乎明确获得了分区并调用了 mapPartitions - 对我来说是第一个。 mapPartitions 里面的RDD 和各种SPARK SCALA 的东西不会飞;它是关于可迭代的,我认为你只需要降到 SCALA 级别。 可序列化错误来自执行 List[Int]。

这是一个显示索引分区以及相应索引值的示例。

import org.apache.spark.Partition, SparkConf, SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.Minutes, Seconds, StreamingContext
// from your stuff, left in

val parallel: RDD[Int] = sc.parallelize(1 to 9, 4)
val mapped =   parallel.mapPartitionsWithIndex
                       (index, iterator) => 
                          println("Called in Partition -> " + index)
                          val myList = iterator.toList                          
                          myList.map(x => (index, x)).groupBy( _._1 ).mapValues( _.map( _._2 ) ).toList.iterator
                       
                   
mapped.collect()

这会返回类似于我认为您似乎想要的以下内容:

res38: Array[(Int, List[Int])] = Array((0,List(1, 2)), (1,List(3, 4)), (2,List(5, 6)), (3,List(7, 8, 9)))

最后说明:文档等内容并不容易理解,您无法从字数统计示例中获得全部信息!

所以,希望这会有所帮助。

我认为它可能会让你走上你想去的正确道路,我看不太清楚,但也许你现在可以只见树木不见森林。

【讨论】:

感谢您的回复。我试图实现的目标被我不需要说明问题的附加代码弄糊涂了...我会解决这个问题的。 它并不是那么直观,也不适合所有人。有很多数据工程师对此一无所知。你可能想投票,因为我认为我可能会帮助你。【参考方案2】:

所以,我正在做的愚蠢的事情是试图返回一个不可序列化的数据结构:一个迭代器,正如我得到的堆栈跟踪清楚地表明的那样。

解决方案是不使用迭代器。而是使用像 Seq 或 List 这样的集合。下面的示例程序说明了做我想做的事情的正确方法。

import org.apache.spark.Partition, SparkConf, SparkContext
import org.apache.spark.rdd.RDD

object AWorkingExample extends App 
  val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, List[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex 
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
      

  val data = partRDD.collect
  println("data:" + data.toList);

顺便说一句,我最初试图做的是具体查看我的并行化到 RDD 结构中的哪些数据块被分配到了哪个分区。这是运行程序时得到的输出:

数据:List((0,List(2, 3)), (1,List(4, 5, 6)), (2,List(7, 8, 9, 10)))

有趣的是,数据分布本来可以达到最佳平衡,但事实并非如此。这不是问题的重点,但我认为这很有趣。

【讨论】:

你的cmets其实有点夸大了。 再看这个我意识到为什么分布不平衡......根据元素MOD num-partitions的一些哈希将元素插入每个分区......所以不平衡的原因是数据是倾斜的visa-vi应用于它的分区函数。

以上是关于想知道为啥空的内部迭代器会导致 mapPartitionsWithIndex 出现不可序列化的异常的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Python 内置的“all”函数为空的可迭代对象返回 True?

为啥这个viewholder是空的?

React hooks:为啥异步函数中的多个 useState 设置器会导致多次重新渲染?

unordered_map 迭代器会改变吗?

为啥训练我的朴素贝叶斯分类器会占用这么多内存?

为啥Required IdentityUser 属性会导致可以为空的数据库字段?