Spark SQL:结构化数据文件处理03
Posted Weikun Xing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark SQL:结构化数据文件处理03相关的知识,希望对你有一定的参考价值。
文章目录
探索分析法律服务网站数据
获取数据
进入spark-sql
./spark-sql
创建数据库和数据表并导入数据
spark-sql> create database law;
22/03/26 17:55:17 WARN ObjectStore: Failed to get database law, returning NoSuchObjectException
OK
spark-sql> use law;
OK
spark-sql> CREATE TABLE law (
> ip bigint,
> area int,
> ie_proxy string,
> ie_type string ,
> userid string,
> clientid string,
> time_stamp bigint,
> time_format string,
> pagepath string,
> ymd int,
> visiturl string,
> page_type string,
> host string,
> page_title string,
> page_title_type int,
> page_title_name string,
> title_keyword string,
> in_port string,
> in_url string,
> search_keyword string,
> source string)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
> STORED AS TEXTFILE;
OK
进入hive,导入数据
hive> show databases;
OK
default
law
test
Time taken: 5.774 seconds, Fetched: 3 row(s)
hive> use law;
OK
Time taken: 0.032 seconds
hive> load data inpath 'hdfs://master/user/root/sparksql/law_utf8.csv' overwrite into table law;
Loading data to table law.law
OK
Time taken: 28.706 seconds
hive>
网页类型分析
scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode
scala> val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3bb99ed3
scala> val pageType=hiveContext.sql("use law")
pageType: org.apache.spark.sql.DataFrame = [result: string]
scala> val pageType=hiveContext.sql("select substring(page_type,1,3) as page_type,count(*) as count_num,round((count(*)/837450.0)*100,4) as weights from law group by substring(page_type,1,3)")
pageType: org.apache.spark.sql.DataFrame = [page_type: string, count_num: bigint, weights: double]
scala> pageType.orderBy(-pageType("count_num")).show()
+---------+---------+-------+
|page_type|count_num|weights|
+---------+---------+-------+
| 101| 411665| 49.157|
| 199| 201399|24.0491|
| 107| 182900|21.8401|
| 301| 18430| 2.2007|
| 102| 17357| 2.0726|
| 106| 3957| 0.4725|
| 103| 1715| 0.2048|
| "ht| 14| 0.0017|
| 201| 12| 0.0014|
| cfr| 1| 1.0E-4|
+---------+---------+-------+
scala> pageType.repartition(1).save("/user/root/sparksql/pageType.json","json",SaveMode.Overwrite)
发现点击与咨询相关的网页(网页类型为101,http://www.*.cn/ask/
)的记录占比约为49.16%,其次是其他类型网页(199)占比约为24.05%,然后是知识相关网页(107,http://www.*.com/info
)占比约为21.84%
统计类别为199,并且包含法律法规的记录个数
scala> val pageLevel=hiveContext.sql("select substring(page_type,1,7) as page_type,count(*) as count_sum from law where visiturl like '%faguizt%' and substring(page_type,1,7) like '%199%' group by page_type")
pageLevel: org.apache.spark.sql.DataFrame = [page_type: string, count_sum: bigint]
scala> pageLevel.show()
+---------+---------+
|page_type|count_sum|
+---------+---------+
| 1999001| 47407|
+---------+---------+
scala> pageLevel.repartition(1).save("/user/root/sparksql/pageLevel.json","json",SaveMode.Overwrite)
咨询类别内部统计
scala> val consultCount=hiveContext.sql("select substring(page_type,1,6) as page_type,count(*) as count_num,round((count(*)/411665.0)*100,4) as weights from law where substring(page_type,1,3)=101 group by substring(page_type,1,6)")
consultCount: org.apache.spark.sql.DataFrame = [page_type: string, count_num: bigint, weights: double]
scala> consultCount.orderBy(-consultCount("count_num")).show()
+---------+---------+-------+
|page_type|count_num|weights|
+---------+---------+-------+
| 101003| 396612|96.3434|
| 101002| 7776| 1.8889|
| 101001| 5603| 1.3611|
| 101009| 854| 0.2075|
| 101008| 378| 0.0918|
| 101007| 147| 0.0357|
| 101004| 125| 0.0304|
| 101006| 107| 0.026|
| 101005| 63| 0.0153|
+---------+---------+-------+
scala> consultCount.repartition(1).save("/user/root/sparksql/consultCount.json","json",SaveMode.Overwrite)
网页中带有“?”的记录统计
scala> hiveContext.sql("select count(*) as num from law where visiturl like '%?%'").show()
+-----+
| num|
+-----+
|65477|
+-----+
scala> pageWith.orderBy(-pageWith("weights")).show()
+---------+---------+-------+
|page_type|count_num|weights|
+---------+---------+-------+
| 1999001| 64691|98.7996|
| 301001| 356| 0.5437|
| 107001| 346| 0.5284|
| 101003| 47| 0.0718|
| 102002| 25| 0.0382|
| 2015020| 5| 0.0076|
| 2015042| 3| 0.0046|
| 2015021| 2| 0.0031|
| 2015031| 2| 0.0031|
+---------+---------+-------+
scala> pageWith.repartition(1).save("/user/root/sparksql/pageWith.json","json",SaveMode.Overwrite)
分析其他类型网页的内部规律
scala> val otherPage=hiveContext.sql("select count(*) as count_num,round((count(*)/64691.0)*100,4) as weights,page_title from law where visiturl like '%?%' and substring(page_type,1,7)=1999001 group by page_title")
otherPage: org.apache.spark.sql.DataFrame = [count_num: bigint, weights: double, page_title: string]
scala> otherPage.orderBy(-otherPage("count_num")).limit(5).show()
+---------+-------+--------------------+
|count_num|weights| page_title|
+---------+-------+--------------------+
| 49894|77.1266| 法律快车-律师助手|
| 6166| 9.5315| 免费发布法律咨询 - 法律快车法律咨询|
| 4455| 6.8866| 咨询发布成功|
| 765| 1.1825| 咨询发布成功 - 法律快车|
| 342| 0.5287|法律快搜-中国法律搜索第一品牌(s...|
+---------+-------+--------------------+
scala> otherPage.orderBy(-otherPage("count_num")).limit(5).save("/user/root/sparksql/otherPage.json","json",SaveMode.Overwrite)
统计“瞎逛用户”点击的网页类型
scala> val streel=hiveContext.sql("select count(*) as count_num,substring(page_type,1,3) as page_type from law where visiturl not like '%.html' group by substring(page_type,1,3)")
streel: org.apache.spark.sql.DataFrame = [count_num: bigint, page_type: string]
scala> streel.orderBy(-streel("count_num")).limit(6).show()
+---------+---------+
|count_num|page_type|
+---------+---------+
| 118011| 199|
| 18175| 107|
| 17357| 102|
| 7130| 101|
| 3957| 106|
| 1024| 301|
+---------+---------+
scala> streel.orderBy(-streel("count_num")).limit(6).save("/user/root/sparksql/streel.json","json",SaveMode.Overwrite)
点击次数分析
scala> hiveContext.sql("select count(distinct userid) from law").show()
+------+
| _c0|
+------+
|350090|
+------+
scala> val clickCount=hiveContext.sql("select click_num,count(click_num) as count,round(count(click_num)*100/350090.0,2),round((count(click_num)*click_num)*100/837450.0,2) from (select count(userid) as click_num from law group by userid)tmp_table group by click_num order by count desc")
clickCount: org.apache.spark.sql.DataFrame = [click_num: bigint, count: bigint, _c2: double, _c3: double]
scala> clickCount.limit(7).show()
+---------+------+-----+-----+
|click_num| count| _c2| _c3|
+---------+------+-----+-----+
| 1|229365|65.52|27.39|
| 2| 63605|18.17|15.19|
| 3| 20992| 6.0| 7.52|
| 4| 12079| 3.45| 5.77|
| 5| 6177| 1.76| 3.69|
| 6| 4181| 1.19| 3.0|
| 7| 2556| 0.73| 2.14|
+---------+------+-----+-----+
scala> clickCount.limit(7).save("/user/root/sparksql/clickCount.json","json",SaveMode.Overwrite)
浏览一次用户统计分析
scala> val onceScan=hiveContext.sql("select page_type,count(page_type) as count,round((count(page_type)*100)/229365.0,4) from (select substring(a.page_type,1,7) as page_type from law a,(select userid from law group by userid having(count(userid)=1))b where a.userid=b.userid)c group by page_type order by count desc")
onceScan: org.apache.spark.sql.DataFrame = [page_type: string, count: bigint, _c2: double]
scala> onceScan.limit(5).show()
+---------+------+-------+
|page_type| count| _c2|
+---------+------+-------+
| 101003|171804|74.9042|
| 107001| 36915|16.0944|
| 1999001| 18581| 8.1011|
| 301001| 1314| 0.5729|
| 102001| 173| 0.0754|
+---------+------+-------+
scala> onceScan.limit(5).save("/user/root/sparksql/onceScan.json","json",SaveMode.Overwrite)
统计点击一次用户访问URL排名
scala> val urlRank=hiveContext.sql("select a.visiturl,count(*) as count from law a,(select userid from law group by userid having(count(userid)=1))b where a.userid=b.userid group by a.visiturl")
urlRank: org.apache.spark.sql.DataFrame = [visiturl: string, count: bigint]
scala> urlRank.orderBy(-urlRank("count")).limit(7).show(false)
+---------------------------------------------------------------+-----+
|visiturl |count|
+---------------------------------------------------------------+-----+
|http://www.lawtime.cn/info/shuifa/slb/2012111978933.html |2130 |
|http://www.lawtime.cn/ask/exp/13655.html |859 |
|http://www.lawtime.cn/info/hunyin/lhlawlhxy/20110707137693.html|804 |
|http://www.lawtime.cn/info/shuifa/slb/2012111978933_2.html |684 |
|http://www.lawtime.cn/ask/question_925675.html |682 |
|http://www.lawtime.cn/ask/exp/8495.html |534 |
|http://www.lawtime.cn/guangzhou |375 |
+---------------------------------------------------------------+-----+
scala> urlRank.orderBy(-urlRank("count")).limit(7).save("/user/root/sparksql/urlRank.json","json",SaveMode.Overwrite)
网页排名分析
原始数据中包含以.html扩展名的网页点击率统计
scala> val clickHtml=hiveContext.sql("select a.visiturl,count(*) as count from law a where a.visiturl like '%.html%' group by a.visiturl")
clickHtml: org.apache.spark.sql.DataFrame = [visiturl: string, count: bigint]
scala> clickHtml.orderBy(-clickHtml("count")).limit(10).show(false)
+-----------------------------------------------------------------+-----+
|visiturl |count|
+-----------------------------------------------------------------+-----+以上是关于Spark SQL:结构化数据文件处理03的主要内容,如果未能解决你的问题,请参考以下文章
将 SQL 查询转换为 Spark Dataframe 结构化数据处理