数据分析EPHS(15)-Spark如何处理Hive的集合类型?

Posted 数据分析EPHS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据分析EPHS(15)-Spark如何处理Hive的集合类型?相关的知识,希望对你有一定的参考价值。

上一篇中我们介绍了hive中的数据类型,其中一类比较重要的类型即集合类型,主要包括struct、map、array三种。那么我们在spark中处理这三种类型呢?本文就来介绍一下。

1、数据介绍

我们还是先来回顾一下上篇中介绍的数据:

创建表:

create table if not exists datatype_test4( id int, info struct<name:string,weight:double>, score array<Int>, info_map map<string,string>) row format delimited fields terminated by ',' COLLECTION ITEMS TERMINATED BY ';' MAP KEYS TERMINATED BY ':';

可以看到,我们定义了三种不同的集合类型字段,并指定了集合类型的分隔符为";",即struct,array,以及map的不同kv之间用";"分割,同时定义了map的key和value之间用":"分割。

接下来,我们创建如下内容的txt文件:

1,文文;70,99;96;100,name:文文;country:china2,毛毛;60,99;92;100,name:毛毛;country:koera3,超超;65,99;96;100,name:超超;country:japan

倒入hive中并查看:

load data local inpath '/Users/meituan_sxw/Downloads/test4.txt' into table datatype_test4;
select * from datatype_test4;

结果如下:

2、struct类型

struct类型在spark中对应的类型为org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema,对应的处理代码如下:

val df = spark.sql( """ |select * from |default.datatype_test4 """.stripMargin) .map(row=>{ val id = row.getAs[Int]("id") val info = row.getAs[GenericRowWithSchema]("info") val name = info.getAs[String]("name") val weight = info.getAs[Double]("weight") (id,name,weight) }).collect().foreach(println)

输出结果为:

(1,文文,70.0)(2,毛毛,60.0)(3,超超,65.0)

其实一开始我也不知道是什么类型的,这主要看报错了,假设我们按row.getAs[String]("info")去获取对应的信息,则会发现有如下报错:

所以根据报错,顺藤摸瓜就可以啦。

3、array类型

对于hive中array类型的数据,如果用的是scala语言的话,对应的类型是scala.collection.mutable.WrappedArray,处理方式如下:

val df = spark.sql( """ |select * from |default.datatype_test4 """.stripMargin) .map(row=>{ val id = row.getAs[Int]("id") val score = row.getAs[WrappedArray[Int]]("score") (id,score(0),score(1),score(2)) }).collect().foreach(println)

输出结果为:

(1,99,96,100)(2,99,92,100)(3,99,96,100)

4、Map类型

对于hive中map类型的数据,对应的也是Scala中的scala.collection.immutable.Map类型,处理代码如下:

val df = spark.sql( """ |select * from |default.datatype_test4 """.stripMargin) .map(row=>{ val id = row.getAs[Int]("id") val info = row.getAs[Map[String,String]]("info_map") val name = info.get("name") val country = info.get("country") (id,name,country)
}).collect().foreach(println)

对应的输出如下:

(1,Some(文文),Some(china))(2,Some(毛毛),Some(koera))(3,Some(超超),Some(japan))

看上去有点奇怪,这是因为scala中的get() 方法返回的是一个叫 Option[String] 的类别。Option 有两个子类别,一个是 Some,一个是 None,当回传 Some 的时候,代表成功地返回了一个 String,同时可以通过 get() 这个函式拿到那个 String,如果返回的是 None,则代表没有字符串可以给你。所以,我们正确的写法如下:

val df = spark.sql( """ |select * from |default.datatype_test4 """.stripMargin) .map(row=>{ val id = row.getAs[Int]("id") val info = row.getAs[Map[String,String]]("info_map") val name = info.get("name").get val country = info.get("country").get (id,name,country)
}).collect().foreach(println)

此时才是我们想要的结果:

(1,文文,china)(2,毛毛,koera)(3,超超,japan)

好了,本篇就到这里了!


以上是关于数据分析EPHS(15)-Spark如何处理Hive的集合类型?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 如何处理比 Spark 存储大得多的数据?

Apache Spark 如何处理不适合内存的数据?

spark如何处理非数值的聚合最大值? [复制]

如何处理 Spark 中的多个 csv.gz 文件?

spark结构化流作业如何处理流-静态DataFrame连接?

Apache spark如何计算分区以及在executor中如何处理分区