spark 能执行udf 不能执行udaf,啥原因

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 能执行udf 不能执行udaf,啥原因相关的知识,希望对你有一定的参考价值。

参考技术A 科普Spark,Spark是什么,如何使用Spark

1.Spark基于什么算法的分布式计算(很简单)
2.Spark与MapReduce不同在什么地方
3.Spark为什么比Hadoop灵活
4.Spark局限是什么
5.什么情况下适合使用Spark

什么是Spark
Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:

Spark与Hadoop的对比
Spark的中间数据放到内存中,对于迭代运算效率更高。
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。
Spark比Hadoop更通用
Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。
这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。
不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
容错性
在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。
可用性
Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。
Spark与Hadoop的结合
Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。
Spark的适用场景
Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)
由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。
运行模式
本地模式
Standalone模式
Mesoes模式
yarn模式
Spark生态系统
Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。
Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。
End.

spark的udf和udaf的注册

spark的udf和udaf的注册

一、udf

spark.udf.register("addName", (x: String) => {
      "name: " + x
    })

二、udaf

  1. 弱类型的自定义聚合函数 是不安全的
package com.huawei.appgallery.udf

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

/**
  * author:Chen
  * 弱类型自定义聚合函数
  * date:2020/2/12 14:29 
  */
object MyAverage extends UserDefinedAggregateFunction {
  //聚合后的输入数据类型
  override def inputSchema: StructType = {
    StructType(StructField("name", StringType, nullable = true) :: StructField("salary", LongType, nullable = false) :: Nil)
  }

  //聚合时缓存中的数据类型
  override def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }

  //聚合后输出的数据类型
  override def dataType: DataType = DoubleType

  //数据一致性
  override def deterministic: Boolean = true

  //初始化缓存中的数据
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }

  //更新同一分区缓存中的数据
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(1)) {
      buffer(0) = buffer.getLong(0) + input.getLong(1)
    }
    buffer(1) = buffer.getLong(1) + 1
  }

  /**
    * 合并不同分区中的缓存数据
    *
    * @param buffer1 MutableAggregationBuffer时要操作的buffer,可变的
    * @param buffer2
    */
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  //对merge后的缓存数据做最后的计算
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(1) match {
      case 0 => 0D
      case _ => buffer.getLong(0) / buffer.getLong(1).toDouble
    }
  }

}

def main(args: Array[String]): Unit = {
    //1
    spark.udf.register("myAverage", MyAverage)
    val lineDS: Dataset[String] = spark.read.textFile("C:\Users\ASUS\Desktop\test2_12.txt")
    //dataset的schame设置
    import spark.implicits._ //必须隐式转换
    val employeeDS: Dataset[Employee] = lineDS.map(line => {
      val items = line.split("	")
      Employee(items(0), items(1).toLong)
    })
    employeeDS.createOrReplaceTempView("view_employee")
    val averageDF = spark.sql(
      """
        |select myAverage(name,salary) as avg_salary from view_employee
      """.stripMargin)
    averageDF.show(false)
    }
  1. 强类型的自定义聚合函数 程序运行时候会检查数据的类型,是安全的
package com.huawei.appgallery.udf

import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Aggregator


/**
  * author:Chen
  * 继承的包是org.apache.spark.sql.expressions.Aggregator
  * 不是org.apache.spark.Aggregator
  * 指定泛型
  * inputschema的类型
  * buffer的类型
  * 输出的类型
  * date:2020/2/12 14:30 
  */
case class Employee(name: String, salary: Long)

case class Buffer(var sum: Long, var count: Long)

object MyAverage2 extends Aggregator[Employee, Buffer, Double] {
  //相当于弱类型自定义聚合函数中的initialize
  override def zero = Buffer(0L, 0L)

  //相当于弱类型自定义聚合函数中的update,统一分区
  override def reduce(b: Buffer, a: Employee): Buffer = {
    //判断a对象中的是否为空
    if (!a.salary.isNaN) {
      b.sum = b.sum + a.salary
    }
    b.count += 1L
    b
  }

  //相当于弱类型自定义聚合函数中merge,不同分区
  override def merge(b1: Buffer, b2: Buffer): Buffer = {
    b1.sum = b1.sum + b2.sum
    b1.count = b1.count + b2.count
    b1
  }

  //相当于弱类型自定义聚合函数中的evaluate,计算
  override def finish(reduction: Buffer): Double = {
    reduction.count match {
      case 0L => 0D
      case _ => reduction.sum / reduction.count.toDouble
    }
  }

  //指定中间值Buffer的编码器类型  强类型自定义聚合函数的强类型体现在这里
  override def bufferEncoder = {
    Encoders.product[Buffer]
  }

  //指定结果的编码器类型  强类型自定义聚合函数的类型定义
  override def outputEncoder: Encoder[Double] = {
    Encoders.scalaDouble
  }
}
//dataset引进了新的序列化的编码方式Encoder[T]代替之前的Java编码和kryo编码
def main(args: Array[String]): Unit = {
    //2
    val lineDS: Dataset[String] = spark.read.textFile("C:\Users\ASUS\Desktop\test2_12.txt")
    //dataset的schame设置
    import spark.implicits._
    val employeeDS: Dataset[Employee] = lineDS.map(line => {
      val items = line.split("	")
      Employee(items(0), items(1).toLong)
    })
    employeeDS.show(false)
    val myAverage2 = MyAverage2.toColumn.name("myAverage")
    val resultDF = employeeDS.select(myAverage2)   **//使用的时候必须是强类型的dataset,不能是弱类型的dataframe,不然会报错
    resultDF.show(false)**
    }

以上是关于spark 能执行udf 不能执行udaf,啥原因的主要内容,如果未能解决你的问题,请参考以下文章

spark 能执行udf 不能执行udaf,啥原因

spark的udf和udaf的注册

spark编写UDF和UDAF

Spark篇---SparkSQL中自定义UDF和UDAF,开窗函数的应用

Spark 自定义函数(udf,udaf)

spark-sql自定义函数UDF和UDAF