Spark SQL:结构化数据文件处理03

Posted Weikun Xing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL:结构化数据文件处理03相关的知识,希望对你有一定的参考价值。

文章目录

探索分析法律服务网站数据

获取数据

进入spark-sql

./spark-sql 

创建数据库和数据表并导入数据

spark-sql> create database law;
22/03/26 17:55:17 WARN ObjectStore: Failed to get database law, returning NoSuchObjectException
OK
spark-sql> use law;
OK
spark-sql> CREATE TABLE  law ( 
         > ip bigint, 
         > area int,
         > ie_proxy string, 
         > ie_type string ,
         > userid string,
         > clientid string,
         > time_stamp bigint,
         > time_format string,
         > pagepath string,
         > ymd int,
         > visiturl string,
         > page_type string,
         > host string,
         > page_title string,
         > page_title_type int,
         > page_title_name string,
         > title_keyword string,
         > in_port string,
         > in_url string,
         > search_keyword string,
         > source string)
         > ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
         > STORED AS TEXTFILE;
OK

进入hive,导入数据

hive> show databases;
OK
default
law
test
Time taken: 5.774 seconds, Fetched: 3 row(s)
hive> use law;
OK
Time taken: 0.032 seconds
hive> load data inpath 'hdfs://master/user/root/sparksql/law_utf8.csv' overwrite into table law;
Loading data to table law.law
OK
Time taken: 28.706 seconds
hive> 

网页类型分析

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3bb99ed3

scala> val pageType=hiveContext.sql("use law")
pageType: org.apache.spark.sql.DataFrame = [result: string]

scala> val pageType=hiveContext.sql("select substring(page_type,1,3) as page_type,count(*) as count_num,round((count(*)/837450.0)*100,4) as weights from law group by substring(page_type,1,3)")
pageType: org.apache.spark.sql.DataFrame = [page_type: string, count_num: bigint, weights: double]

scala> pageType.orderBy(-pageType("count_num")).show()
+---------+---------+-------+                                                   
|page_type|count_num|weights|
+---------+---------+-------+
|      101|   411665| 49.157|
|      199|   201399|24.0491|
|      107|   182900|21.8401|
|      301|    18430| 2.2007|
|      102|    17357| 2.0726|
|      106|     3957| 0.4725|
|      103|     1715| 0.2048|
|      "ht|       14| 0.0017|
|      201|       12| 0.0014|
|      cfr|        1| 1.0E-4|
+---------+---------+-------+

scala> pageType.repartition(1).save("/user/root/sparksql/pageType.json","json",SaveMode.Overwrite)

发现点击与咨询相关的网页(网页类型为101,http://www.*.cn/ask/)的记录占比约为49.16%,其次是其他类型网页(199)占比约为24.05%,然后是知识相关网页(107,http://www.*.com/info)占比约为21.84%

统计类别为199,并且包含法律法规的记录个数

scala> val pageLevel=hiveContext.sql("select substring(page_type,1,7) as page_type,count(*) as count_sum from law where visiturl like '%faguizt%' and substring(page_type,1,7) like '%199%' group by page_type")
pageLevel: org.apache.spark.sql.DataFrame = [page_type: string, count_sum: bigint]

scala> pageLevel.show()
+---------+---------+                                                           
|page_type|count_sum|
+---------+---------+
|  1999001|    47407|
+---------+---------+

scala> pageLevel.repartition(1).save("/user/root/sparksql/pageLevel.json","json",SaveMode.Overwrite)

咨询类别内部统计

scala> val consultCount=hiveContext.sql("select substring(page_type,1,6) as page_type,count(*) as count_num,round((count(*)/411665.0)*100,4) as weights from law where substring(page_type,1,3)=101 group by substring(page_type,1,6)")
consultCount: org.apache.spark.sql.DataFrame = [page_type: string, count_num: bigint, weights: double]

scala> consultCount.orderBy(-consultCount("count_num")).show()
+---------+---------+-------+                                                   
|page_type|count_num|weights|
+---------+---------+-------+
|   101003|   396612|96.3434|
|   101002|     7776| 1.8889|
|   101001|     5603| 1.3611|
|   101009|      854| 0.2075|
|   101008|      378| 0.0918|
|   101007|      147| 0.0357|
|   101004|      125| 0.0304|
|   101006|      107|  0.026|
|   101005|       63| 0.0153|
+---------+---------+-------+

scala> consultCount.repartition(1).save("/user/root/sparksql/consultCount.json","json",SaveMode.Overwrite)

网页中带有“?”的记录统计

scala> hiveContext.sql("select count(*) as num from law where visiturl like '%?%'").show()
+-----+                                                                         
|  num|
+-----+
|65477|
+-----+
scala> pageWith.orderBy(-pageWith("weights")).show()
+---------+---------+-------+                                                   
|page_type|count_num|weights|
+---------+---------+-------+
|  1999001|    64691|98.7996|
|   301001|      356| 0.5437|
|   107001|      346| 0.5284|
|   101003|       47| 0.0718|
|   102002|       25| 0.0382|
|  2015020|        5| 0.0076|
|  2015042|        3| 0.0046|
|  2015021|        2| 0.0031|
|  2015031|        2| 0.0031|
+---------+---------+-------+

scala> pageWith.repartition(1).save("/user/root/sparksql/pageWith.json","json",SaveMode.Overwrite)

分析其他类型网页的内部规律

scala> val otherPage=hiveContext.sql("select count(*) as count_num,round((count(*)/64691.0)*100,4) as weights,page_title from law where visiturl like '%?%' and substring(page_type,1,7)=1999001 group by page_title")
otherPage: org.apache.spark.sql.DataFrame = [count_num: bigint, weights: double, page_title: string]

scala> otherPage.orderBy(-otherPage("count_num")).limit(5).show()
+---------+-------+--------------------+                                        
|count_num|weights|          page_title|
+---------+-------+--------------------+
|    49894|77.1266|           法律快车-律师助手|
|     6166| 9.5315| 免费发布法律咨询 - 法律快车法律咨询|
|     4455| 6.8866|              咨询发布成功|
|      765| 1.1825|       咨询发布成功 - 法律快车|
|      342| 0.5287|法律快搜-中国法律搜索第一品牌(s...|
+---------+-------+--------------------+

scala> otherPage.orderBy(-otherPage("count_num")).limit(5).save("/user/root/sparksql/otherPage.json","json",SaveMode.Overwrite)

统计“瞎逛用户”点击的网页类型

scala> val streel=hiveContext.sql("select count(*) as count_num,substring(page_type,1,3) as page_type from law where visiturl not like '%.html' group by substring(page_type,1,3)")
streel: org.apache.spark.sql.DataFrame = [count_num: bigint, page_type: string]

scala> streel.orderBy(-streel("count_num")).limit(6).show()
+---------+---------+                                                           
|count_num|page_type|
+---------+---------+
|   118011|      199|
|    18175|      107|
|    17357|      102|
|     7130|      101|
|     3957|      106|
|     1024|      301|
+---------+---------+

scala> streel.orderBy(-streel("count_num")).limit(6).save("/user/root/sparksql/streel.json","json",SaveMode.Overwrite)

点击次数分析

scala> hiveContext.sql("select count(distinct userid) from law").show()
+------+                                                                        
|   _c0|
+------+
|350090|
+------+

scala> val clickCount=hiveContext.sql("select click_num,count(click_num) as count,round(count(click_num)*100/350090.0,2),round((count(click_num)*click_num)*100/837450.0,2) from (select count(userid) as click_num from law group by userid)tmp_table group by click_num order by count desc")
clickCount: org.apache.spark.sql.DataFrame = [click_num: bigint, count: bigint, _c2: double, _c3: double]

scala> clickCount.limit(7).show()
+---------+------+-----+-----+                                                  
|click_num| count|  _c2|  _c3|
+---------+------+-----+-----+
|        1|229365|65.52|27.39|
|        2| 63605|18.17|15.19|
|        3| 20992|  6.0| 7.52|
|        4| 12079| 3.45| 5.77|
|        5|  6177| 1.76| 3.69|
|        6|  4181| 1.19|  3.0|
|        7|  2556| 0.73| 2.14|
+---------+------+-----+-----+

scala> clickCount.limit(7).save("/user/root/sparksql/clickCount.json","json",SaveMode.Overwrite)

浏览一次用户统计分析

scala> val onceScan=hiveContext.sql("select page_type,count(page_type) as count,round((count(page_type)*100)/229365.0,4) from (select substring(a.page_type,1,7) as page_type from law a,(select userid from law group by userid having(count(userid)=1))b where a.userid=b.userid)c group by page_type order by count desc")
onceScan: org.apache.spark.sql.DataFrame = [page_type: string, count: bigint, _c2: double]

scala> onceScan.limit(5).show()
+---------+------+-------+                                                      
|page_type| count|    _c2|
+---------+------+-------+
|   101003|171804|74.9042|
|   107001| 36915|16.0944|
|  1999001| 18581| 8.1011|
|   301001|  1314| 0.5729|
|   102001|   173| 0.0754|
+---------+------+-------+

scala> onceScan.limit(5).save("/user/root/sparksql/onceScan.json","json",SaveMode.Overwrite)

统计点击一次用户访问URL排名

scala> val urlRank=hiveContext.sql("select a.visiturl,count(*) as count from law a,(select userid from law group by userid having(count(userid)=1))b where a.userid=b.userid group by a.visiturl")
urlRank: org.apache.spark.sql.DataFrame = [visiturl: string, count: bigint]

scala> urlRank.orderBy(-urlRank("count")).limit(7).show(false)
+---------------------------------------------------------------+-----+         
|visiturl                                                       |count|
+---------------------------------------------------------------+-----+
|http://www.lawtime.cn/info/shuifa/slb/2012111978933.html       |2130 |
|http://www.lawtime.cn/ask/exp/13655.html                       |859  |
|http://www.lawtime.cn/info/hunyin/lhlawlhxy/20110707137693.html|804  |
|http://www.lawtime.cn/info/shuifa/slb/2012111978933_2.html     |684  |
|http://www.lawtime.cn/ask/question_925675.html                 |682  |
|http://www.lawtime.cn/ask/exp/8495.html                        |534  |
|http://www.lawtime.cn/guangzhou                                |375  |
+---------------------------------------------------------------+-----+

scala> urlRank.orderBy(-urlRank("count")).limit(7).save("/user/root/sparksql/urlRank.json","json",SaveMode.Overwrite)

网页排名分析

原始数据中包含以.html扩展名的网页点击率统计

scala> val clickHtml=hiveContext.sql("select a.visiturl,count(*) as count from law a where a.visiturl like '%.html%' group by a.visiturl")
clickHtml: org.apache.spark.sql.DataFrame = [visiturl: string, count: bigint]

scala> clickHtml.orderBy(-clickHtml("count")).limit(10).show(false)
+-----------------------------------------------------------------+-----+       
|visiturl                                                         |count|
+-----------------------------------------------------------------+-----+以上是关于Spark SQL:结构化数据文件处理03的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 结构化数据文件处理

Spark SQL是处理结构化的数据

将 SQL 查询转换为 Spark Dataframe 结构化数据处理

Spark SQL

Spark SQL大数据处理并写入Elasticsearch

赵强老师在Spark SQL中读取JSON文件