Spark DataFrame vector 类型存储到Hive表

Posted itboys

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark DataFrame vector 类型存储到Hive表相关的知识,希望对你有一定的参考价值。

1. 软件版本

软件版本
Spark 1.6.0
Hive 1.2.1

2. 场景描述

在使用Spark时,有时需要存储DataFrame数据到Hive表中,一般的存储方式如下:

 // 注册临时表
 myDf.registerTempTable("t1")
 // 使用SQLContext从临时表创建Hive表
 sqlContext.sql("create table h1 as select * from t1")

在DataFrame中存储一般的数据类型,比如Double、Float、String等到Hive表是没有问题的,但是在DataFrame中还有一个数据类型:vector , 如果存储这种类型到Hive表那么会报错,类似:

org.apache.spark.sql.AnalysisException: cannot resolve cast(norF as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>) 
due to data type mismatch: cannot cast [email protected] to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));

这个错误如果搜索的话,可以找到类似这种结果: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)

也即是说暂时使用Spark是不能够直接存储vector类型的DataFrame到Hive表的,那么有没有一种方法可以存储呢? 
想到这里,那么在Spark中是有一个工具类VectorAssembler 可以达到相反的目的,即把多个列(也需要要求这些列的类型是一致的)合并成一个vector列。但是并没有相反的工具类,也就是我们的需求。

3. 问题的迂回解决方法

这里提出一个解决方法如下: 
假设: 
1. DataFrame中数据类型是vector的列中的数据类型都是已知的,比如Double,数值类型; 
2. vector列中的具体子列个数也是已知的; 
有了上面两个假设就可以通过构造RDD[Row]以及schema的方式来生成新的DataFrame,并且这个新的DataFrame的类型是基本类型,如Double。这样就可以保存到Hive中了。

4. 示例

本例流程如下:

技术分享图片

代码如下:

// 1.读取数据
val data = sqlContext.sql("select * from normalize")

读取数据如下: 

技术分享图片

// 2.构造vector数据
import org.apache.spark.ml.feature.VectorAssembler
val cols = data.schema.fieldNames
val newFeature = "fea"
val asb = new VectorAssembler().setInputCols(cols).setOutputCol(newFeature)
val newDf = asb.transform(data)
newDf.show(1)

技术分享图片

// 3.做归一化
import org.apache.spark.ml.feature.Normalizer
val norFeature ="norF"
val normalizer = new Normalizer().setInputCol(newFeature).setOutputCol(norFeature).setP(1.0)
val l1NormData = normalizer.transform(newDf)
l1NormData.show(1)
// 存储DataFrame vector类型报错
// l1NormData.select(norFeature).registerTempTable("t1")
// sqlContext.sql("create table h2 as select * from t1")

技术分享图片

// 4.扁平转换vector到row
import org.apache.spark.sql.Row
val finalRdd= l1NormData.select(norFeature).rdd.map(row => Row.fromSeq(row.getAs[org.apache.spark.mllib.linalg.DenseVector](0).toArray))
val finalDf = sqlContext.createDataFrame(finalRdd,data.schema)
finalDf.show(1)

技术分享图片

// 5. 存储到Hive中
finalDf.registerTempTable("t1")
sqlContext.sql("create table h1 as select * from t1")

技术分享图片

 







以上是关于Spark DataFrame vector 类型存储到Hive表的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame vector 类型存储到Hive表

WrappedArray 的 Spark Dataframe 到 Dataframe[Vector]

如何在 Apache Spark ML API 中从“DataFrame”创建一个“Vector”?

无法将 Spark ML 库中的 Vector 用于 DataFrame

如何在 Scala(Spark 2.0)中将带有字符串的 DataFrame 转换为带有 Vectors 的 DataFrame

Spark dataframe 中某几列合并成vector或拆分