spark 省份次数统计实例

Posted wangshuang123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 省份次数统计实例相关的知识,希望对你有一定的参考价值。

//统计access.log文件里面IP地址对应的省份,并把结果存入到mysql



package access1 import java.sql.DriverManager import org.apache.spark.broadcast.Broadcast import org.apache.spark.SparkConf, SparkContext
object AccessIp def main(args: Array[String]): Unit = //new sc val conf = new SparkConf () .setAppName ( this.getClass.getSimpleName ) .setMaster ( "local[*]" ) val sc = new SparkContext ( conf ) //读取数据 val accesslines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\access.log" ) val iplines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\ip.txt" ) //处理数据 val ip1 = iplines.map ( tp => val splits = tp.split ( "[|]" ) val start = splits ( 2 ).toLong val end = splits ( 3 ).toLong val province = splits ( 6 ) (start, end, province) ).collect () //广播变量(这里使用是不对,当数据使用三次的时候,在使用广播变量,否则会占内存) val broads: Broadcast[Array[(Long, Long, String)]] = sc.broadcast ( ip1 ) //处理数据 val result2 = accesslines.map ( tp => val splits = tp.split ( "[|]" ) val ip = splits ( 1 ) val ips = MyUtils.ip2Long ( ip ) val valiues: Array[(Long, Long, String)] = broads.value val index = MyUtils.binarSearch ( valiues, ips ) var province = "" if (index != -1) province = valiues ( index )._3 (province, 1) ).reduceByKey ( _ + _ ).sortBy ( -_._2 ) //写入mysql result2.foreachPartition ( filter => //获取mysql的链接 val connection = DriverManager.getConnection ( "jdbc:mysql://localhost:3306/test1?characterEncoding=UTF-8&serverTimezone=GMT%2B8", "root", "123456" ) filter.foreach ( tp => val ps = connection.prepareStatement ( "insert into suibian values(?,?)" ) //设置参数 ps.setString ( 1, tp._1 ) ps.setInt ( 2, tp._2 ) //提交 ps.executeLargeUpdate () ps.close () ) connection.close () ) sc.stop () broads.unpersist ( true )
package access1

object MyUtils 
  //ip地址转换为lang类型
  def ip2Long(ip: String): Long = 
    val fragments = ip.split ( "[.]" )
    var ipNum = 0L
    for (i <- 0 until fragments.length) 
      ipNum = fragments ( i ).toLong | ipNum << 8L
    
    ipNum
  

  //二分查找法
  def binarSearch(array: Array[(Long, Long, String)], target: Long): Int = 
    var low = 0
    var high = array.length - 1

    while (low <= high) 
      var mid = low + ( high - low ) / 2
      if (array ( mid )._1 <= target && array ( mid )._2 >= target) 
        return mid
       else if (array ( mid )._1 > target) 
        high = mid - 1
       else 
        low = mid + 1
      
    
    return -1
  

 

以上是关于spark 省份次数统计实例的主要内容,如果未能解决你的问题,请参考以下文章

scala实战之spark用户在线时长和登录次数统计实例

使用Spark进行搜狗日志分析实例——统计每个小时的搜索量

Spark算法实例:词频统计

Spark实例-通过HDFS文件实时统计

Spark-Core练习题

07 Spark RDD编程 综合实例 英文词频统计