Spark读取Hive数据的两种方式与保存数据到HDFS

Posted 岁月的眸

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark读取Hive数据的两种方式与保存数据到HDFS相关的知识,希望对你有一定的参考价值。

Spark读取Hive数据的两种方式与保存数据到HDFS

Spark读取Hive数据的方式主要有两种

1、 通过访问hive metastore的方式,这种方式通过访问hive的metastore元数据的方式获取表结构信息和该表数据所存放的HDFS路径,这种方式的特点是效率高、数据吞吐量大、使用spark操作起来更加友好。

2、 通过spark jdbc的方式访问,就是通过链接hiveserver2的方式获取数据,这种方式底层上跟spark链接其他rdbms上一样,可以采用sql的方式先在其数据库中查询出来结果再获取其结果数据,这样大部分数据计算的压力就放在了数据库上。

两种方式的具体实现示例

首先创建Spark Session对象:

    val spark = SparkSession.builder()
      .appName("test")
      .enableHiveSupport()
      .getOrCreate()

方式一(推荐) 直接采用Spark on Hive的方式读取数据,这样SparkSession在使用sql的时候会去找集群hive中的库表,加载其hdfs数据与其元数据组成DataFrame

val df = spark.sql("select * from test.user_info")

方式二 采用spark jdbc的方式,如果有特别的使用场景的话也可以通过这种方法来实现。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.jdbc.JdbcDialect, JdbcDialects
 
 
object test
  def main(args: Array[String]): Unit = 
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("test")
      .getOrCreate()
 
    register() //如果不手动注册,只能获取到数据库中的表结构,而不能获取到数据
    val df = spark.read
      .format("jdbc")
      .option("driver","org.apache.hive.jdbc.HiveDriver")
      .option("url","jdbc:hive2://xxx:10000/")
      .option("user","hive")
      .option("password",xxx)
      .option("fetchsize", "2000")
      .option("dbtable","test.user_info")
      .load()
    df.show(10)
  
 
  def register(): Unit = 
    JdbcDialects.registerDialect(HiveSqlDialect)
  
 
 
  case object HiveSqlDialect extends JdbcDialect 
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
 
    override def quoteIdentifier(colName: String): String = 
      colName.split('.').map(part => s"`$part`").mkString(".")
    
  
 

Spark的DataFrame和DataSet使用

​ DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。此外,多种数据都可以转化为DataFrame,例如Spark计算过程中生成的RDD、结构化数据文件、Hive中的表、外部数据库等。

在Spark中,一个DataFrame所代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。相对于RDD,Dataset提供了强类型支持,在RDD的每行数据加了类型约束。而且使用DatasetAPI同样会经过Spark SQL优化器的优化,从而提高程序执行效率。

DataFrame和R的数据结构以及python pandas DataFrame的数据结构和操作基本一致。

创建DataFrame、DataSet

  • 创建RDD
  • RDD转化为ROW
  • 通过ROW和元数据信息生成DataFrame
  • 然后通过DataFrame和对应的类转化为DataSet
  • 也就是说DataFrame是DataSet[Row],这里可以通过指定的类将其转化,DataSet[User]
  • 需要注意的事转化使用的类需要时内部类,然后就是类里的变量名要和元数据信息的列名保持对齐。
object MovieLenDataSet 
  case class User(UserID:String, Gender:String, Age:String, Occupation:String, Zip_Code:String)
  def main(args: Array[String]): Unit = 
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession.builder()
      .appName("MovieLenDataSet")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val dataPath = "/home/ffzs/data/ml-1m"
    val schema4users = StructType(
      "UserID::Gender::Age::Occupation::Zip_code"
        .split("::")
        .map(it => StructField(it, StringType, nullable = true))
    )

    val usersRdd = spark.sparkContext.textFile(f"$dataPath/users.dat")
    val usersRows = usersRdd.map(_.split("::"))
      .map(it => 
        it.map(_.trim)
      )
      .map(it => Row(it(0), it(1), it(2), it(3), it(4)))
    val usersDF: DataFrame = spark.createDataFrame(usersRows, schema4users)
    val usersDataSet = usersDF.as[User]
    usersDataSet.show(5)
  

Spark的DataFrame存储的Mode模式选择

spark的dataframe存储中都会调用write的mode方法:

data.write.mode(“append”).saveAsTable(s"u s e r i d . userid.userid.datasetid")
data.write.mode(SaveMode.Overwrite).parquet(hdfspath)

但不同时候的参数是不同的。

先看一下源码:

spark-v2.3.0:

  def mode(saveMode: SaveMode): DataFrameWriter[T] = 
    this.mode = saveMode
    this
  

  /**
   * Specifies the behavior when data or table already exists. Options include:
   *   - `overwrite`: overwrite the existing data.
   *   - `append`: append the data.
   *   - `ignore`: ignore the operation (i.e. no-op).
   *   - `error` or `errorifexists`: default option, throw an exception at runtime.
   *
   * @since 1.4.0
   */
  def mode(saveMode: String): DataFrameWriter[T] = 
    this.mode = saveMode.toLowerCase(Locale.ROOT) match 
      case "overwrite" => SaveMode.Overwrite
      case "append" => SaveMode.Append
      case "ignore" => SaveMode.Ignore
      case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists
      case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
        "Accepted save modes are 'overwrite', 'append', 'ignore', 'error', 'errorifexists'.")
    
    this
  

SaveMode.Overwrite(对应着字符串"overwrite"):表示如果目标文件目录中数据已经存在了,则用需要保存的数据覆盖掉已经存在的数据

SaveMode.Append(对应着字符串"append"):表示如果目标文件目录中数据已经存在了,则将数据追加到目标文件中
数据追加方式是:先将表中的所有索引删除,再追加数据

SaveMode.Ignore(对应着字符串为:“ignore”):表示如果目标文件目录中数据已经存在了,则不做任何操作
SaveMode.ErrorIfExists(对应着字符串"error"):表示如果目标文件目录中数据已经存在了,则抛异常(这个是默认的配置)


spark之Dataframe保存模式

以前spark.write时总要先把原来的删了,但其实是可以设置写入模式的。

val df =  spark.read.parquet(input)
df.write.mode("overwrite").parquet(output)

dataframe写入的模式一共有4种:

  1. overwrite 覆盖已经存在的文件
  2. append 向存在的文件追加
  3. ignore 如果文件已存在,则忽略保存操作
  4. error / default 如果文件存在,则报错
def mode(saveMode: String): DataFrameWriter = 
    this.mode = saveMode.toLowerCase match 
      case "overwrite" => SaveMode.Overwrite              
      case "append" => SaveMode.Append                    
      case "ignore" => SaveMode.Ignore                    
      case "error" | "default" => SaveMode.ErrorIfExists  
      case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
        "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
    
    this
  

spark write写入数据task failed失败,两种模式下的不同表现

1、SaveMode.Append

task失败重试,并不会删除上一次失败前写入的数据(文件根据分区号命名),重新执行时会继续追加数据。所以会出现数据重复。

2、SaveMode.Overwrite

task失败重试,会删除该分区上次失败所写入的数据文件,然后创建一个新的数据文件写入数据。所以不会出现数据重复。

启动spark任务报错:ERROR SparkUI: Failed to bind SparkUI


当启动一个spark任务的时候,就会占用一个端口,默认为4040,从日志可以看到当端口被占用时,它会默认依次增加16次到4056,如果还是失败的话,就会报错退出。

解决方法:

  1. 使用spark-submit提交任务时,在脚本中加配置:–conf spark.port.maxRetries=128(亲测有效)

参考博客:
https://blog.csdn.net/qq_42213403/article/details/117557610

以上是关于Spark读取Hive数据的两种方式与保存数据到HDFS的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming 读取 Kafka 数据的两种方式

【数仓】对比spark-hive的两种分布式计算模式

spark-streaming对接kafka的两种方式

以增量方式导出 Hive 数据

Spark学习笔记4:数据读取与保存

利用FFMPEG以及EasyRTMP实现读取H.264文件推RTMP视频流的两种方式