spark-Row实战&源码分析

Posted 从0到1学习大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark-Row实战&源码分析相关的知识,希望对你有一定的参考价值。

前言

spark在操作dataset/dataframe时候,经常需要对每一行数据进行处理,像map/mapPartition/foreach/ foreachParition等,那么我们在拿到一行数据时候,如何从中拿取出我们想要的列,然后进行相关业务操作,经常摸不着头脑,本文基于spark 2.1.1分析了一行数据的表达,以及详细的讲解了各种操作拿取行中相应列数据的方法。

Row实战操作

根据api文档,Row有三种获取元素的方法,下面一一讲解并附一例子理解与实战,首先我们先建立一个DataSet。

 scala> val data = List(("James ","","Smith","36636","M",60000),  ("Michael ","Rose","","40288","M",70000),  ("Robert ","","Williams","42114","",400000),  ("Maria ","Anne","Jones","39192","F",500000),  ("Jen","Mary","Brown","","F",0))
 
 data: List[(String, String, String, String, String, Int)] = List(("James ","",Smith,36636,M,60000), ("Michael ",Rose,"",40288,M,70000), ("Robert ","",Williams,42114,"",400000), ("Maria ",Anne,Jones,39192,F,500000), (Jen,Mary,Brown,"",F,0))
 
 // 得到的是DataFrame (DataSet[Row])
 scala> val dataDf = spark.createDataFrame(data)
 dataDf: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 4 more fields]


通过下标

Row 的具体某列可以使用 apply or get来获取相应index的数据

 scala> dataDf.map(x => x.get(0))

这样使用会出错,会有以下几种错误,原因是DataSet中的map操作必须指定schema

 java.lang.ClassNotFoundException: scala.Any 
   
 <console>:33: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

解决方法有以下几种

  1. 利用原生rdd处理,但是不建议用这个,相对于RDD,DataSet进行了很多的底层优化,拥有很不错性能。

     scala> dataDf.rdd.map(x => x(0))
     res2: org.apache.spark.rdd.RDD[Any] = MapPartitionsRDD[4] at map at <console>:26
     
     scala> dataDf.rdd.map(x => x(0)).collect
     res3: Array[Any] = Array("James ", "Michael ", "Robert ", "Maria ", Jen)
  2. DataSet建立的时候就创建schema,自动将Row转化为自定义的类P

     scala> case class Person(_1: String, _2: String, _3: String, _4: String, _5: String, _6: Integer)
     defined class Person
     
     scala> val dataDf = spark.createDataFrame(data).as[Person]
     dataDf: org.apache.spark.sql.Dataset[Person] = [_1: string, _2: string ... 4 more fields]
     
     scala> dataDf.map(x => x._1)
     res23: org.apache.spark.sql.Dataset[String] = [value: string]

通过使用类型匹配,显示的声明列的类型

可以通过getAs方法来获取某个下标对应的列数据,直接映射到相应的数据类型,通过此种方式,需要保证相应下标对应的数据必须是非空的,不然会抛出NullPointer错误

 scala> val df1 = dataDf.map(x => (x.getAs[String](0), x.getAs[String](1)))
 df1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
 
 scala> val df2 = dataDf.map(x => (x.getString(0), x.getString(1)))
 df2: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
 // spark是把元祖看成case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase

通过scala模式匹配

直接利用scala的模式匹配的策略定义case Row(),原因是case Row()通过scala模式匹配,可以知道集合Row里面拥有多少个基本的类型,则可以完成对Row的自动编码,然后可以进行相应的处理。

 scala> import org.apache.spark.sql.Row
 import org.apache.spark.sql.Row
 
 scala> dataDf.map{case Row(a: String, b: String, c: String, d: String, e: Integer) => (a, b)}
 res20: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]

源码分析

在spark中Row的定义在org.apache.spark.sql.Row

变量

  1. length/size 获取Row一行的有几个元素 def size: Int = length

  2. schema 可以指定row的模式 def schema: StructType = null

方法

下标获取方法

 def get(i: Int): Any
 
 def apply(i: Int): Any = get(i)
 ==> row(0)  row.get(0)  必须是已经指定过schema的才可以进行这个操作,不然会报错

下标+类型获取方法

使用之前最好检查是否为NULL:def isNullAt(i: Int): Boolean = get(i) == null,否则会Null pointer

// 强制类型转换,必须非NULL,不然会报null pointer
private def getAnyValAs[T <: AnyVal](i: Int): T =
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
else getAs[T](i)
def getAs[T](i: Int): T = get(i).asInstanceOf[T]

// 提供了基本类型的获取方法:ClassCastException/NullPointerException两种异常可能会抛出
def getBoolean(i: Int): Boolean = getAnyValAs[Boolean](i)
getByte/getShort/getInt/getLong/getFloat/getDouble/getString/getDecimal/getDate/getTimestamp/getSeq/getList/getMap/getJavaMap

def getStruct(i: Int): Row = getAs[Row](i)

参考

  1. https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Row.html

  2. https://www.jianshu.com/p/8f82d5414676

  3. https://blog.51cto.com/9269309/1954540

以上是关于spark-Row实战&源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Android内核源码bionic目录下的子目录arch-arm源码分析笔记

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段

Android编程实战源码级免杀_Dex动态加载技术_Metasploit安卓载荷傀儡机代码复现

Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段

ffmpeg 源码分析与命令实战和代码实战