spark-Row实战&源码分析
Posted 从0到1学习大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark-Row实战&源码分析相关的知识,希望对你有一定的参考价值。
前言
spark在操作dataset/dataframe时候,经常需要对每一行数据进行处理,像map/mapPartition/foreach/ foreachParition等,那么我们在拿到一行数据时候,如何从中拿取出我们想要的列,然后进行相关业务操作,经常摸不着头脑,本文基于spark 2.1.1分析了一行数据的表达,以及详细的讲解了各种操作拿取行中相应列数据的方法。
Row实战操作
根据api文档,Row有三种获取元素的方法,下面一一讲解并附一例子理解与实战,首先我们先建立一个DataSet。
scala> val data = List(("James ","","Smith","36636","M",60000), ("Michael ","Rose","","40288","M",70000), ("Robert ","","Williams","42114","",400000), ("Maria ","Anne","Jones","39192","F",500000), ("Jen","Mary","Brown","","F",0))
data: List[(String, String, String, String, String, Int)] = List(("James ","",Smith,36636,M,60000), ("Michael ",Rose,"",40288,M,70000), ("Robert ","",Williams,42114,"",400000), ("Maria ",Anne,Jones,39192,F,500000), (Jen,Mary,Brown,"",F,0))
// 得到的是DataFrame (DataSet[Row])
scala> val dataDf = spark.createDataFrame(data)
dataDf: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 4 more fields]
通过下标
Row
的具体某列可以使用 apply
or get
来获取相应index的数据
scala> dataDf.map(x => x.get(0))
这样使用会出错,会有以下几种错误,原因是DataSet中的map操作必须指定schema
java.lang.ClassNotFoundException: scala.Any
<console>:33: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
解决方法有以下几种
利用原生rdd处理,但是不建议用这个,相对于RDD,DataSet进行了很多的底层优化,拥有很不错性能。
scala> dataDf.rdd.map(x => x(0))
res2: org.apache.spark.rdd.RDD[Any] = MapPartitionsRDD[4] at map at <console>:26
scala> dataDf.rdd.map(x => x(0)).collect
res3: Array[Any] = Array("James ", "Michael ", "Robert ", "Maria ", Jen)DataSet建立的时候就创建schema,自动将Row转化为自定义的类P
scala> case class Person(_1: String, _2: String, _3: String, _4: String, _5: String, _6: Integer)
defined class Person
scala> val dataDf = spark.createDataFrame(data).as[Person]
dataDf: org.apache.spark.sql.Dataset[Person] = [_1: string, _2: string ... 4 more fields]
scala> dataDf.map(x => x._1)
res23: org.apache.spark.sql.Dataset[String] = [value: string]
通过使用类型匹配,显示的声明列的类型
可以通过getAs
方法来获取某个下标对应的列数据,直接映射到相应的数据类型,通过此种方式,需要保证相应下标对应的数据必须是非空的,不然会抛出NullPointer错误。
scala> val df1 = dataDf.map(x => (x.getAs[String](0), x.getAs[String](1)))
df1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
scala> val df2 = dataDf.map(x => (x.getString(0), x.getString(1)))
df2: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
// spark是把元祖看成case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase
通过scala模式匹配
直接利用scala的模式匹配的策略定义case Row(),原因是case Row()通过scala模式匹配,可以知道集合Row里面拥有多少个基本的类型,则可以完成对Row的自动编码,然后可以进行相应的处理。
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> dataDf.map{case Row(a: String, b: String, c: String, d: String, e: Integer) => (a, b)}
res20: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]
源码分析
在spark中Row的定义在org.apache.spark.sql.Row
中
变量
length/size 获取Row一行的有几个元素
def size: Int = length
schema 可以指定row的模式
def schema: StructType = null
方法
下标获取方法
def get(i: Int): Any
def apply(i: Int): Any = get(i)
==> row(0) row.get(0) 必须是已经指定过schema的才可以进行这个操作,不然会报错
下标+类型获取方法
使用之前最好检查是否为NULL:def isNullAt(i: Int): Boolean = get(i) == null
,否则会Null pointer
// 强制类型转换,必须非NULL,不然会报null pointer
private def getAnyValAs[T <: AnyVal](i: Int): T =
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
else getAs[T](i)
def getAs[T](i: Int): T = get(i).asInstanceOf[T]
// 提供了基本类型的获取方法:ClassCastException/NullPointerException两种异常可能会抛出
def getBoolean(i: Int): Boolean = getAnyValAs[Boolean](i)
getByte/getShort/getInt/getLong/getFloat/getDouble/getString/getDecimal/getDate/getTimestamp/getSeq/getList/getMap/getJavaMap
def getStruct(i: Int): Row = getAs[Row](i)
参考
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Row.html
https://www.jianshu.com/p/8f82d5414676
https://blog.51cto.com/9269309/1954540
以上是关于spark-Row实战&源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Android内核源码bionic目录下的子目录arch-arm源码分析笔记
Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段
Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段
Android编程实战源码级免杀_Dex动态加载技术_Metasploit安卓载荷傀儡机代码复现
Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段