Spark-Core练习题

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark-Core练习题相关的知识,希望对你有一定的参考价值。

案例

1. 统计广告ID

  • 数据格式:
timestamp   province   city	userid	adid
时间点 	  省份	   城市	用户     广告
用户ID范围:0-99
省份,城市,ID相同:0-9
adid:0-19

e.g.
1516609143867 6 7 64 16
  • 需求:
1.统计每一个省份点击TOP3的广告ID
2.统计每一个省份每一个小时的TOP3广告ID 
  • 代码
package com.qf.bigdata.spark.core.day5

import com.qf.bigdata.spark.core.day2.SparkUtils
import org.apache.spark.rdd.RDD

object Demo1 {
    def main(args: Array[String]): Unit = {
        val sc = SparkUtils.getDefaultSparkContext()
        val logsRDD: RDD[String] = sc.textFile("file:///c://ftp/Advert.txt")
        val arrsRDD: RDD[Array[String]] = logsRDD.map(_.split("\\\\s+"))
        val proAdnAdvCntRDD: RDD[(String, Int)] = arrsRDD.map(arr => (arr(1) + "_" + arr(4),1))
        val sumRDD: RDD[(String, Int)] = proAdnAdvCntRDD.reduceByKey(_ + _)
        val sum2RDD: RDD[(String, (String, Int))] = sumRDD.map(t => {
            val param = t._1.split("_")
            (param(0), (param(1), t._2))
        })
        val gbkRDD: RDD[(String, Iterable[(String, Int)])] = sum2RDD.groupByKey()

        val resRDD: RDD[(String, List[(String, Int)])] = gbkRDD.mapValues(values => {
            values.toList.sortWith((x, y) => x._2 > y._2).take(3)
        })

        val map: collection.Map[String, List[(String, Int)]] = resRDD.collectAsMap()
        println(map)

        sc.stop()
    }
}

在这里插入图片描述

2. 基站停留时间TopN

  • 数据格式
19735E1C66.log  这个文件中存储着日志信息
手机号,时间戳,基站ID 连接状态(1连接 0断开)
 
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1

 
lac_info.txt 这个文件中存储基站信息
 文件组成  基站ID, 经,纬度
  • 需求
根据用户产生日志的信息,在那个基站停留时间最长
在一定范围内,求所用户经过的所有基站所停留时间最长的Top2
  • 思路
1.获取用户产生的日志信息并切分
2.用户在基站停留的总时长
3.获取基站的基础信息
4.把经纬度的信息join到用户数据中
5.求出用户在某些基站停留的时间top2
  • 代码
package com.qf.bigdata.spark.core.day5

import com.qf.bigdata.spark.core.day2.SparkUtils
import org.apache.spark.rdd.RDD

import java.text.SimpleDateFormat

object Demo2 {
    def main(args: Array[String]): Unit = {
        val sc = SparkUtils.getDefaultSparkContext()
        //一 加载用户数据
        val filesRDD: RDD[String] = sc.textFile("file:///c://ftp/19735E1C66.log")
        val userInfoRDD: RDD[((String, String), Long)] = filesRDD.map(line => {
            val fields: Array[String] = line.split(",")
            val phone = fields(0) // 手机号
            val time = str2Long(fields(1)) // 时间戳
            val lac = fields(2) // 基站ID
            val eventType = fields(3) // 连接状态

            val time_long = if (eventType.equals(1)) -time else time
            ((phone, lac), time_long) // ((手机号,基站ID), 时长)
        })
        val sumRDD: RDD[((String, String), Long)] = userInfoRDD.reduceByKey(_ + _)

        val lacAndPTRDD: RDD[(String, (String, Long))] = sumRDD.map(t => {
            val phone = t._1._1
            val lac = t._1._2
            val time = t._2
            (lac, (phone, time))
        })

        // 加载基站的信息
        val lacInfoRDD: RDD[String] = sc.textFile("file:///c://ftp/lac_info.txt")
        val lacAndXYRDD: RDD[(String, (String, String))] = lacInfoRDD.map(line => {
            val fields: Array[String] = line.split(",")
            val lac = fields(0)
            val x = fields(1)
            val y = fields(2)
            (lac, (x, y))
        })

        val joinRDD: RDD[(String, ((String, Long), (String, String)))] = lacAndPTRDD join lacAndXYRDD

        val phoneAndTXYRDD: RDD[(String, Long, (String, String))] = joinRDD.map(t => {
            val phone = t._2._1._1
            val time = t._2._1._2
            val xy = t._2._2
            (phone, time, xy)
        })

        val groupRDD: RDD[(String, Iterable[(String, Long, (String, String))])] = phoneAndTXYRDD.groupBy(_._1)

        val sortRDD: RDD[(String, List[(String, Long, (String, String))])] = groupRDD.mapValues(_.toList.sortBy(_._2).reverse)

        val resRDD: RDD[(String, List[(Long, (String, String))])] = sortRDD.map(t => {
            val phone = t._1
            val list = t._2
            val filterlist: List[(Long, (String, String))] = list.map(tup => {
                val time = tup._2
                val xy = tup._3
                (time, xy)
            })
            (phone, filterlist)
        })

        val ressRDD: RDD[(String, List[(Long, (String, String))])] = resRDD.mapValues(_.take(2))

        println(ressRDD.collect().toList)

        sc.stop()
    }

    def str2Long(date:String):Long = {
        val format = new SimpleDateFormat("yyyyMMddHHmmss")
        val time: Long = format.parse(date).getTime
        time
    }
}

3. ip所属区域的访问量

3.1 批注

	这个案例数据中有一个sql文件直接在数据库软件中执行,可以创建出一张表,这张可以用于将计算结果写入到数据库中,用于后续JDBCRDD使用

3.2 代码

package com.qf.bigdata.spark.core.day5

import com.qf.bigdata.spark.core.day2.SparkUtils
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD

import java.sql.{Connection, Date, DriverManager, PreparedStatement}

object Demo3 {
    def main(args: Array[String]): Unit = {
        //1. 加载数据,并且将ip数据转换为广播变量
        val sc = SparkUtils.getDefaultSparkContext()
        val httpLogRDD: RDD[String] = sc.textFile("file:///c://ftp/http.log")
        val ipRDD: RDD[String] = sc.textFile("file:///c://ftp/ip.txt")

        val province2IpRDD: RDD[(String, Long, Long)] = ipRDD.map(line => {
            val fields: Array[String] = line.split("\\\\|")
            val startIp = ip2long(fields(0))
            val endIp = ip2long(fields(1))
            val province = fields(6)
            (province, startIp, endIp)
        })

        val ipArrs: Array[(String, Long, Long)] = province2IpRDD.collect().sortWith {
            case ((pro1, sIp1, eIp1), (pro2, sIp2, eIp2)) => {
                sIp1 < sIp2
            }
        }

        val provinceIpBC: Broadcast[Array[(String, Long, Long)]] = sc.broadcast(ipArrs)


        //2. 处理用户数据
        val province2CountRDD: RDD[(String, Int)] = httpLogRDD.map(line => {
            val fields = line.split("\\\\|")
            val ip: Long = ip2long(fields(1))
            val ipArr: Array[(String, Long, Long)] = provinceIpBC.value
            val index = binarySearch(ip, ipArr)
            if (index < 0) (null, -1)
            else (ipArr(index)._1, 1)
        }).filter(_._1 != null).reduceByKey(_ + _)

        //3. 保存结果到数据库
        res2mysql(province2CountRDD)
        sc.stop()
    }

    /**
     * 将字符串ip的表示形式转换为long的形式
     *
     */
    def ip2long(ip:String): Long = {
        val fields: Array[String] = ip.split("\\\\.")
        var ipNum = 0L
        fields.foreach(field => ipNum = field.toLong | ipNum << 8)
        ipNum
    }

    /**
     * 根据二分查找法,查询ip在ipArray中的位置,如果没有返回-1
     */
    def binarySearch(ip:Long, ipArr:Array[(String, Long, Long)]) : Int = {
        var start = 0
        var end = ipArr.length

        while(start <= end) {
            val mid = (start + end) / 2
            val startIp = ipArr(mid)._2
            val endIp = ipArr(mid)._3
            if (ip >= startIp && ip <= endIp) {
                return mid
            } else if (ip < startIp) {
                end = mid - 1
            } else {
                start = mid + 1
            }
        }
        return -1
    }

    /**
     * 将rdd保存到mysql数据库
     */
    def res2mysql(rdd:RDD[(String, Int)]) : Unit = {
        rdd.foreachPartition(itertor => {
            var con:Connection = null
            var ps:PreparedStatement = null
            val jdbcUrl = "jdbc:mysql://146.56.208.76:3307/sqoop?useUnicode=true&characterEncoding=utf8"
            val user = "root"
            val pass = "123456"
            con = DriverManager.getConnection(jdbcUrl, user, pass)
            val sql = s"insert into `location_info` (location, counts, access_date) values (?, ?, ?)"
            ps = con.prepareStatement(sql)
            itertor.foreach {
                case (province, count) => {
                    ps.setString(1, province)
                    ps.setString(2, count+"")
                    ps.setDate(3, new Date(System.currentTimeMillis()))
                    ps.addBatch()
                }
            }
            ps.executeBatch()
            ps.close()
        })
    }
}

对应资源文件

以上是关于Spark-Core练习题的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习总结-Spark-Core

Spark-core学习之二 Spark-core

Python练习册 第 0013 题: 用 Python 写一个爬图片的程序,爬 这个链接里的日本妹子图片 :-),(http://tieba.baidu.com/p/2166231880)(代码片段

spring练习,在Eclipse搭建的Spring开发环境中,使用set注入方式,实现对象的依赖关系,通过ClassPathXmlApplicationContext实体类获取Bean对象(代码片段

Spark-Core的交并差拉链

csharp Epicor标准练习片段