dependency & DF & DataSet & patitioner

Posted satyrs

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了dependency & DF & DataSet & patitioner相关的知识,希望对你有一定的参考价值。

  • dependecy

narrow :onetoone prune range

wide :shuffle

查看依赖:

.dependecies

.toDebugString

--------------------------------------

  • DF

catalyst:(sql‘s query optimizer)

reordering operations

reduce the amount of data we read

pruning unneeded partitions

---by analyzing DF and filter operations(RDD google paper)

 

Tungsten:(sql‘s off-heap data encoder)
highly-specialized data encoders(more data fit inmemory)

column-based(store data in column-based format)

off-heap(avoid GC,managed by Tungsten)

 

DF limitations:--->use RDD

即使不存在column “id”,但会compile,runtime出错。

data type 只有classes/products/sql datatypes (Tungsten encoder supports)

requires semistructured/structured data. (not like JSON)

--------------------------------------

  •  DS

1  Rows don‘t have type infomation. cannot be collected back to master

val avgPriceagain = avgPrice.map{
  row => (row(0).asInstanceOf[String], row(1).asInstanceOf[Int])      
}

avgPrice.head.schema.printTreeString()

val avgPriceagain = avgPrice.map{
  row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[Double])      
}

 

若提供的schema错误,则CastException, 通过head.schema.printTreeString()获取后重新map。

====>Dataset: typed distributed collections of data, have to have schema and encoders

2  more type info than DF,more optimizations than RDD

  both typed and untyped relational operations, can use map.filter.flatMap again

import spark.implicits._
listingDF.toDS
listingDS .groupByKey( l=> l.zip) .agg(avg(‘price)).as[Double]) //mix Apis

val listingDS = spark.read.json("people.josn").as[Person]
myRDD.toDS
List("a","s").toDS

//found: Column  required:TypedColumn

‘price.as[Double]


 

 

 3  groupByKey 返回 KeyValueGroupedDataset。和DF一样在groupByKey之后有aggregation

KeyValueGroupedDataset含有aggregation操作,返回DS

 

4  一些aggregation operations on KeyValueGroupedDataset

reduceGroups(f:(V,V)=>V):Dataset[(K,V)]

agg[U](col :TypedColumn[V,U]):Dataset[(K,U)]

mapGroups/flapMapGroups

 

5  没有reduceByKey 替换

keyvalueRDD.reduceByKey(_+_)

keyvalueDS.groupByKey(p=>p._1).mapValues(p=>p._2).reduceGroups((acc,str)=>acc+str)


val myAgg = new Aggregator[(Int,String),String,String]{
    def zero:String = ""
    def reduce(b:String, a:(Int,String)):String = b+a._2
    def merge(b1:String, b3:String):String =b1+b2
    def finish(r:String);String =r
   override def bufferEncoder:Encoder[String]=Encoders.STRING
o
verride def outputEncoder:Encoder[String]=Encoders.STRING
}.toColumn
keyvalueDS.groupByKey(p=>p._1).agg(myAgg.as[String])
//myAgg.reduce((s,input)=> input._2.foldLeft(s)(a,b)=>a+b).toDS

keyvalueDS.groupByKey(p=>p._1).mapGroups((k,v)=>(k, v.foldLeft("")((acc,p) = >acc+p._2))).sort(‘_1)

 

但mapGroups不支持partial aggregation and require shuffling

!!:不要使用mapGroups. 使用reduce或aggregate

 

6  class Aggregator[-IN,BUF,OUT]

org.spark.sql.expressions.Aggregator

初始化见上。

 

7  Encoder

将data在JVM object和SparkSQL tabular两种表示方式中转换。所有dataset都需要。

比较于java或Kryo,Tungsten更限制在primitive和caseclass、sparksqltypes,并且含有schema信息。使用内存更少,速度更快。

两种方式引入encoders:

1 (implicitly)import spark.implicits._ 2 (explicitly)org.spark.sql.Encoders 

Ecoders.scalaInt 

其中部分:

INT LONG STRING etc. for nullable primitives

scalaInt, scalaLong, scalaByte etc. for scala‘s primitive

product/tuple for scala‘s product and tuple types

 

8  limitations of DS

catalyst无法优化functional operations或lambda,schema info、具体操作都不能知道。

使用high-level functions失去部分catalyst优化,但Tungsten效果仍全保有。

一些scala应用中的class,Tungsten无法Encoder。data types还是有限。

 

9  使用

RDD:数据unstructured,需要fine-tune 或涉及low-level的computation, 复杂的数据types无法使用Encoders序列化

DS:数据semi-unstructured即可,需要typesafety,需要使用functional api,对性能未必太过追求

DF: 数据semi-unstructured即可,追求automatically optimized

 

  • partitioner

1  hash partitioners and range partitioners

shuffles的时候,partition可以获得性能提升。data locality可避免网络传输。

reduceByKey中,使用range partitioners完全避免shuffle。

val pairs = pRdd.map(p=>p.id, p.price)
val tunedPar = new RangePartitioner(8,pairs)//8个partitions

val partitioned = pairs.partitionBy(tunedPar)
                                .persist()

val purPer = partitioned.map(p=>(p._1,(1,p._2)))

val purPerMon = purPer.reduceByKey((v1,v2)=>(v1._1+v2._1,v1._2+v2._2))
                                  .collect()

 

以上是关于dependency & DF & DataSet & patitioner的主要内容,如果未能解决你的问题,请参考以下文章

R语言F分布函数F Distribution(df, pf, qf & rf )实战

Day414&415&416&417&418.检索服务 -谷粒商城

解决Gradle执行命令时报Could not determine the dependencies of task ':compileReleaseJava'.(示例

Day414&415&416.检索服务 -谷粒商城

熊猫 iloc 和 loc & multiindex

Linux文件大小 指令&编程