Spark-Core练习题
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-Core练习题相关的知识,希望对你有一定的参考价值。
案例
1. 统计广告ID
- 数据格式:
timestamp province city userid adid
时间点 省份 城市 用户 广告
用户ID范围:0-99
省份,城市,ID相同:0-9
adid:0-19
e.g.
1516609143867 6 7 64 16
- 需求:
1.统计每一个省份点击TOP3的广告ID
2.统计每一个省份每一个小时的TOP3广告ID
- 代码
package com.qf.bigdata.spark.core.day5
import com.qf.bigdata.spark.core.day2.SparkUtils
import org.apache.spark.rdd.RDD
object Demo1 {
def main(args: Array[String]): Unit = {
val sc = SparkUtils.getDefaultSparkContext()
val logsRDD: RDD[String] = sc.textFile("file:///c://ftp/Advert.txt")
val arrsRDD: RDD[Array[String]] = logsRDD.map(_.split("\\\\s+"))
val proAdnAdvCntRDD: RDD[(String, Int)] = arrsRDD.map(arr => (arr(1) + "_" + arr(4),1))
val sumRDD: RDD[(String, Int)] = proAdnAdvCntRDD.reduceByKey(_ + _)
val sum2RDD: RDD[(String, (String, Int))] = sumRDD.map(t => {
val param = t._1.split("_")
(param(0), (param(1), t._2))
})
val gbkRDD: RDD[(String, Iterable[(String, Int)])] = sum2RDD.groupByKey()
val resRDD: RDD[(String, List[(String, Int)])] = gbkRDD.mapValues(values => {
values.toList.sortWith((x, y) => x._2 > y._2).take(3)
})
val map: collection.Map[String, List[(String, Int)]] = resRDD.collectAsMap()
println(map)
sc.stop()
}
}
2. 基站停留时间TopN
- 数据格式
19735E1C66.log 这个文件中存储着日志信息
手机号,时间戳,基站ID 连接状态(1连接 0断开)
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
lac_info.txt 这个文件中存储基站信息
文件组成 基站ID, 经,纬度
- 需求
根据用户产生日志的信息,在那个基站停留时间最长
在一定范围内,求所用户经过的所有基站所停留时间最长的Top2
- 思路
1.获取用户产生的日志信息并切分
2.用户在基站停留的总时长
3.获取基站的基础信息
4.把经纬度的信息join到用户数据中
5.求出用户在某些基站停留的时间top2
- 代码
package com.qf.bigdata.spark.core.day5
import com.qf.bigdata.spark.core.day2.SparkUtils
import org.apache.spark.rdd.RDD
import java.text.SimpleDateFormat
object Demo2 {
def main(args: Array[String]): Unit = {
val sc = SparkUtils.getDefaultSparkContext()
//一 加载用户数据
val filesRDD: RDD[String] = sc.textFile("file:///c://ftp/19735E1C66.log")
val userInfoRDD: RDD[((String, String), Long)] = filesRDD.map(line => {
val fields: Array[String] = line.split(",")
val phone = fields(0) // 手机号
val time = str2Long(fields(1)) // 时间戳
val lac = fields(2) // 基站ID
val eventType = fields(3) // 连接状态
val time_long = if (eventType.equals(1)) -time else time
((phone, lac), time_long) // ((手机号,基站ID), 时长)
})
val sumRDD: RDD[((String, String), Long)] = userInfoRDD.reduceByKey(_ + _)
val lacAndPTRDD: RDD[(String, (String, Long))] = sumRDD.map(t => {
val phone = t._1._1
val lac = t._1._2
val time = t._2
(lac, (phone, time))
})
// 加载基站的信息
val lacInfoRDD: RDD[String] = sc.textFile("file:///c://ftp/lac_info.txt")
val lacAndXYRDD: RDD[(String, (String, String))] = lacInfoRDD.map(line => {
val fields: Array[String] = line.split(",")
val lac = fields(0)
val x = fields(1)
val y = fields(2)
(lac, (x, y))
})
val joinRDD: RDD[(String, ((String, Long), (String, String)))] = lacAndPTRDD join lacAndXYRDD
val phoneAndTXYRDD: RDD[(String, Long, (String, String))] = joinRDD.map(t => {
val phone = t._2._1._1
val time = t._2._1._2
val xy = t._2._2
(phone, time, xy)
})
val groupRDD: RDD[(String, Iterable[(String, Long, (String, String))])] = phoneAndTXYRDD.groupBy(_._1)
val sortRDD: RDD[(String, List[(String, Long, (String, String))])] = groupRDD.mapValues(_.toList.sortBy(_._2).reverse)
val resRDD: RDD[(String, List[(Long, (String, String))])] = sortRDD.map(t => {
val phone = t._1
val list = t._2
val filterlist: List[(Long, (String, String))] = list.map(tup => {
val time = tup._2
val xy = tup._3
(time, xy)
})
(phone, filterlist)
})
val ressRDD: RDD[(String, List[(Long, (String, String))])] = resRDD.mapValues(_.take(2))
println(ressRDD.collect().toList)
sc.stop()
}
def str2Long(date:String):Long = {
val format = new SimpleDateFormat("yyyyMMddHHmmss")
val time: Long = format.parse(date).getTime
time
}
}
3. ip所属区域的访问量
3.1 批注
这个案例数据中有一个sql文件直接在数据库软件中执行,可以创建出一张表,这张可以用于将计算结果写入到数据库中,用于后续JDBCRDD使用
3.2 代码
package com.qf.bigdata.spark.core.day5
import com.qf.bigdata.spark.core.day2.SparkUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import java.sql.{Connection, Date, DriverManager, PreparedStatement}
object Demo3 {
def main(args: Array[String]): Unit = {
//1. 加载数据,并且将ip数据转换为广播变量
val sc = SparkUtils.getDefaultSparkContext()
val httpLogRDD: RDD[String] = sc.textFile("file:///c://ftp/http.log")
val ipRDD: RDD[String] = sc.textFile("file:///c://ftp/ip.txt")
val province2IpRDD: RDD[(String, Long, Long)] = ipRDD.map(line => {
val fields: Array[String] = line.split("\\\\|")
val startIp = ip2long(fields(0))
val endIp = ip2long(fields(1))
val province = fields(6)
(province, startIp, endIp)
})
val ipArrs: Array[(String, Long, Long)] = province2IpRDD.collect().sortWith {
case ((pro1, sIp1, eIp1), (pro2, sIp2, eIp2)) => {
sIp1 < sIp2
}
}
val provinceIpBC: Broadcast[Array[(String, Long, Long)]] = sc.broadcast(ipArrs)
//2. 处理用户数据
val province2CountRDD: RDD[(String, Int)] = httpLogRDD.map(line => {
val fields = line.split("\\\\|")
val ip: Long = ip2long(fields(1))
val ipArr: Array[(String, Long, Long)] = provinceIpBC.value
val index = binarySearch(ip, ipArr)
if (index < 0) (null, -1)
else (ipArr(index)._1, 1)
}).filter(_._1 != null).reduceByKey(_ + _)
//3. 保存结果到数据库
res2mysql(province2CountRDD)
sc.stop()
}
/**
* 将字符串ip的表示形式转换为long的形式
*
*/
def ip2long(ip:String): Long = {
val fields: Array[String] = ip.split("\\\\.")
var ipNum = 0L
fields.foreach(field => ipNum = field.toLong | ipNum << 8)
ipNum
}
/**
* 根据二分查找法,查询ip在ipArray中的位置,如果没有返回-1
*/
def binarySearch(ip:Long, ipArr:Array[(String, Long, Long)]) : Int = {
var start = 0
var end = ipArr.length
while(start <= end) {
val mid = (start + end) / 2
val startIp = ipArr(mid)._2
val endIp = ipArr(mid)._3
if (ip >= startIp && ip <= endIp) {
return mid
} else if (ip < startIp) {
end = mid - 1
} else {
start = mid + 1
}
}
return -1
}
/**
* 将rdd保存到mysql数据库
*/
def res2mysql(rdd:RDD[(String, Int)]) : Unit = {
rdd.foreachPartition(itertor => {
var con:Connection = null
var ps:PreparedStatement = null
val jdbcUrl = "jdbc:mysql://146.56.208.76:3307/sqoop?useUnicode=true&characterEncoding=utf8"
val user = "root"
val pass = "123456"
con = DriverManager.getConnection(jdbcUrl, user, pass)
val sql = s"insert into `location_info` (location, counts, access_date) values (?, ?, ?)"
ps = con.prepareStatement(sql)
itertor.foreach {
case (province, count) => {
ps.setString(1, province)
ps.setString(2, count+"")
ps.setDate(3, new Date(System.currentTimeMillis()))
ps.addBatch()
}
}
ps.executeBatch()
ps.close()
})
}
}
以上是关于Spark-Core练习题的主要内容,如果未能解决你的问题,请参考以下文章
Python练习册 第 0013 题: 用 Python 写一个爬图片的程序,爬 这个链接里的日本妹子图片 :-),(http://tieba.baidu.com/p/2166231880)(代码片段
spring练习,在Eclipse搭建的Spring开发环境中,使用set注入方式,实现对象的依赖关系,通过ClassPathXmlApplicationContext实体类获取Bean对象(代码片段