Spark DataFrame vector 类型存储到Hive表

Posted fansy1990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark DataFrame vector 类型存储到Hive表相关的知识,希望对你有一定的参考价值。

Spark DataFrame vector 类型存储到Hive表

1. 软件版本

软件版本
Spark1.6.0
Hive1.2.1

2. 场景描述

在使用Spark时,有时需要存储DataFrame数据到Hive表中,一般的存储方式如下:

 // 注册临时表
 myDf.registerTempTable("t1")
 // 使用SQLContext从临时表创建Hive表
 sqlContext.sql("create table h1 as select * from t1")

在DataFrame中存储一般的数据类型,比如Double、Float、String等到Hive表是没有问题的,但是在DataFrame中还有一个数据类型:vector , 如果存储这种类型到Hive表那么会报错,类似:

org.apache.spark.sql.AnalysisException: cannot resolve 'cast(norF as struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)' due to data type mismatch: cannot cast org.apache.spark.mllib.linalg.VectorUDT@f71b0bce to StructType(StructField(type,ByteType,true), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,true),true), StructField(values,ArrayType(DoubleType,true),true));

这个错误如果搜索的话,可以找到类似这种结果: Failed to insert VectorUDT to hive table with DataFrameWriter.insertInto(tableName: String)

也即是说暂时使用Spark是不能够直接存储vector类型的DataFrame到Hive表的,那么有没有一种方法可以存储呢?
想到这里,那么在Spark中是有一个工具类VectorAssembler 可以达到相反的目的,即把多个列(也需要要求这些列的类型是一致的)合并成一个vector列。但是并没有相反的工具类,也就是我们的需求。

3. 问题的迂回解决方法

这里提出一个解决方法如下:
假设:
1. DataFrame中数据类型是vector的列中的数据类型都是已知的,比如Double,数值类型;
2. vector列中的具体子列个数也是已知的;
有了上面两个假设就可以通过构造RDD[Row]以及schema的方式来生成新的DataFrame,并且这个新的DataFrame的类型是基本类型,如Double。这样就可以保存到Hive中了。

4. 示例

本例流程如下:

Created with Raphaël 2.1.2 读取Hive表数据到DataFrame 使用VectorAssembler把DataFrame各列合并到vector类型 使用合并后vector类型作为输入,传输到Normalizer中做归一化 扁平化归一化后的vector类型的rdd,并且获取原始数据的schema,最后拼接成为DataFrame 存储DataFrame到Hive

代码如下:

// 1.读取数据
val data = sqlContext.sql("select * from normalize")

读取数据如下:

// 2.构造vector数据
import org.apache.spark.ml.feature.VectorAssembler
val cols = data.schema.fieldNames
val newFeature = "fea"
val asb = new VectorAssembler().setInputCols(cols).setOutputCol(newFeature)
val newDf = asb.transform(data)
newDf.show(1)

// 3.做归一化
import org.apache.spark.ml.feature.Normalizer
val norFeature ="norF"
val normalizer = new Normalizer().setInputCol(newFeature).setOutputCol(norFeature).setP(1.0)
val l1NormData = normalizer.transform(newDf)
l1NormData.show(1)
// 存储DataFrame vector类型报错
// l1NormData.select(norFeature).registerTempTable("t1")
// sqlContext.sql("create table h2 as select * from t1")

// 4.扁平转换vector到row
import org.apache.spark.sql.Row
val finalRdd= l1NormData.select(norFeature).rdd.map(row => Row.fromSeq(row.getAs[org.apache.spark.mllib.linalg.DenseVector](0).toArray))
val finalDf = sqlContext.createDataFrame(finalRdd,data.schema)
finalDf.show(1)

// 5. 存储到Hive中
finalDf.registerTempTable("t1")
sqlContext.sql("create table h1 as select * from t1")

以上是关于Spark DataFrame vector 类型存储到Hive表的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame vector 类型存储到Hive表

WrappedArray 的 Spark Dataframe 到 Dataframe[Vector]

如何在 Apache Spark ML API 中从“DataFrame”创建一个“Vector”?

无法将 Spark ML 库中的 Vector 用于 DataFrame

如何在 Scala(Spark 2.0)中将带有字符串的 DataFrame 转换为带有 Vectors 的 DataFrame

Spark dataframe 中某几列合并成vector或拆分