实时--1.1 日志

Posted shengyang17

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时--1.1 日志相关的知识,希望对你有一定的参考价值。

 

接收日志并发送到kafka:gmall-logger模块--SpringBoot的部署

日志前加一个ts时间戳;org.slf4j.LoggerFactory,slf4j是一个接口,它会去找实现类;LoggeerFactory默认的会在jar包中找实现类;

logging(它是LoggeFactory默认使用的)和log4j是竞争关系,所以要在gmall-logger.pom.xml文件中加入exclusions把logging给排除了

  <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter</artifactId>
       <exclusions>
         <exclusion>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-logging</artifactId>
         </exclusion>
       </exclusions>
    </dependency>
    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-log4j</artifactId>
       <version>1.3.8.RELEASE</version>
    </dependency>
  </dependencies>

 

com.atguigu.gmall.logger.controller.LoggerController

@RestController
public class LoggerController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    private static final Logger logger = LoggerFactory.getLogger(LoggerController.class);
    @PostMapping("log")
    public String doLog(@RequestParam("log") String log){

        JSONObject jsonObject = JSON.parseObject(log);
        jsonObject.put("ts", System.currentTimeMillis());

        //System.out.println(log);

        // 落盘成为日志文件     // log4j
        logger.info(jsonObject.toJSONString());
        //发送kafka
        if ("startup".equals(jsonObject.getString("type"))){
            kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_STARTUP, jsonObject.toJSONString());
        }else {
            kafkaTemplate.send(GmallConstant.KAFKA_TOPIC_EVENT, jsonObject.toJSONString());
        }
        return "success";
    }
}

利用resources/ log4j.properties进行log日志的落盘:

log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender  //怎么写这个日志;类型--控制台
log4j.appender.atguigu.MyConsole.target=System.err  //控制台有两种:System.out日志颜色黑色和System.err日志是红色的
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout    //自定义的,除了要打印的日志级别,还要打印什么
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n  //格式,p是日志级别,%m输出的内容,%n是换行;

log4j.appender.atguigu.File=org.apache.log4j.DailyRollingFileAppender  //每日滚动文件,每天产生一个文件;
log4j.appender.atguigu.File.file=/opt/module/applog/gmall/log/app.log  //输出的文件路径
log4j.appender.atguigu.File.DatePattern=.yyyy-MM-dd        //输出文件的后缀; 当天的日志是没有后缀的,一旦过了12点,就有后缀.‘yyyy-MM-dd‘,后缀是日志时间
log4j.appender.atguigu.File.layout=org.apache.log4j.PatternLayout //自定义格式
log4j.appender.atguigu.File.layout.ConversionPattern=%m%n  //要干干净净的打印信息;

log4j.logger.com.atguigu.gmall.logger.controller.LoggerController=info,atguigu.File  
//某一个类的路径,只监控某个类所产生的日志;log4j.rootLogger=error,atguigu.Myconsole表示根底的,除了上边指定的都是它,首先是精确匹配到info就日志输出就按照它的打印,它们后边的.File或.Myconsole都会输出

日志级别有:级别从低到高 trace、debug、info、warn、error、fatal,如果写info,从低到高比它高的都可以输出出来;

把日志采集模块打包部署到Linux中

  在idea中的maven执行package,把打好的jar包拷贝到Linux 路径下,java -jar  /app/gmall/dw-logger-0.0.1-SNAPSHOT.jar   >/dev/null  2>&1  &

  测试 由windows发送日志到linux 日志落盘

在三台系统同时部署日志采集系统的jar包,分别把/applog/目录拷贝到三台虚拟机上

搭建nginx  https://www.cnblogs.com/shengyang17/p/10836168.html ,只需一台部署nginx即可;

由windows发送模拟日志,nginx负责路由,日志服务负责接收。

window发送日志------>>niginx路由---> linux中接收日志的jar存储日志文件并发给kafka--->kafka

更新集群启动脚本 ,加入nginx操作: ./logger-cluster.sh start 启动nginx路由,路由三台虚拟机给接收日志服务的jar包 ,并发给fakfa;

logger-cluster.sh

 

技术图片
[[email protected] gmall]$ vim logger-cluster.sh    
#!/bin/bash
JAVA_BIN=/opt/module/jdk1.8.0_144/bin/java
PROJECT=gmall
APPNAME=gmall-logger-0.0.1-SNAPSHOT.jar
SERVER_PORT=8080
 
case $1 in
"start"){
    for i in hadoop101 hadoop102 hadoop103
    do
        echo "========启动日志服务: $i==============="
        ssh $i  "$JAVA_BIN -Xms32m -Xmx64m -jar /opt/module/applog/$PROJECT/$APPNAME --server.port=$SERVER_PORT >./app.error 2>&1  &"
    done
        echo "==============启动NGINX==============="
        /opt/module/nginx/sbin/nginx
  };;

"stop"){
        echo "=============关闭NGINX================="
        /opt/module/nginx/sbin/nginx -s stop
    for i in  hadoop101 hadoop102 hadoop103
    do
        echo "========关闭日志服务: $i==============="
        ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk ‘{print \\$2}‘|xargs kill" >/dev/null 2>&1
    done
 
  };;
esac
View Code

当前日志模块:

                       技术图片

 

日活DAU

搭建实时处理模块gmall-realtime:

  消费kafka;利用redis过滤当日已经计入的日活设备;把每批次新增的当日日活信息保存到ES中;从ES中查出数据,发布成数据接口

 消费kafka& 利用redis去重

  1、把今日新增的活跃用户保存到redis中;   2、每条数据经过过滤,去掉redis中的已有的用户

  设计Redis的kv; Key:dau:2019-01-22, value: 设备id

  业务类开发

 DauApp.scala    消费kafka中数据(通过MyKafkaUtil获取) --->>利用redis去重---->>保存到ES(通过MyEsUtil工具类)中;

object DauApp {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]")
        val streamingContext: StreamingContext = new StreamingContext(new SparkContext(conf),Seconds(5))

        val inputStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getKafkaStream(GmallConstant.KAFKA_TOPIC_STARTUP, streamingContext)
        //  1 把当日已访问过的用户保存起来 redis
        //  2  以当日已访问用户清单为依据 ,过滤掉再次访问的请求

        // 转换case class  补全日期格式
        val startupLogDStream: DStream[StartUpLog] = inputStream.map { record =>
          val jsonStr: String = record.value()
          val startUpLog: StartUpLog = JSON.parseObject(jsonStr, classOf[StartUpLog])
          //把日期进行补全
          val dateTimeString: String = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(startUpLog.ts))
          val dateTimeArray: Array[String] = dateTimeString.split(" ")
          startUpLog.logDate = dateTimeArray(0)
          startUpLog.logHour = dateTimeArray(1).split(":")(0)
          startUpLog.logHourMinute = dateTimeArray(1)
          startUpLog
        }
        // 去重操作
        val filterDStream: DStream[StartUpLog] = startupLogDStream.transform { rdd =>
          //driver 每时间间隔执行一次
          println("过滤前:" + rdd.count())
          val jedis: Jedis = RedisUtil.getJedisClient
          val curDate: String = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
          val key: String = "dau:" + curDate
          val dauSet: util.Set[String] = jedis.smembers(key) //当日日活用户清单
        //使用广播变量
        val dauBC: Broadcast[util.Set[String]] = streamingContext.sparkContext.broadcast(dauSet)

          val filterRDD: RDD[StartUpLog] = rdd.filter {
            startuplog =>
              !dauBC.value.contains(startuplog.mid)
          }
          println("过滤后:" + filterRDD.count())
          filterRDD
        }

        // 考虑到 新的访问可能会出现重复 ,所以以mid为key进行去重,每个mid为小组 每组取其中一个
        val startupLogGroupDStream: DStream[(String, Iterable[StartUpLog])] = filterDStream.map{startuplog => (startuplog.mid, startuplog)}.groupByKey()

        val startupLogFilterDistinctDStream: DStream[StartUpLog] = startupLogGroupDStream.flatMap {
          case (mid, startupLogIter) =>
            val startupLogOneIter: Iterable[StartUpLog] = startupLogIter.take(1)
            startupLogOneIter
        }

        //  1 把当日已访问过的用户保存到 redis
        startupLogFilterDistinctDStream.foreachRDD{rdd =>
          rdd.foreachPartition{startupLogItr =>
            val jedis: Jedis = RedisUtil.getJedisClient
            val startupList: List[StartUpLog] = startupLogItr.toList
            for (elem <- startupList) {
              val key: String = "dau:" + elem.logDate
              jedis.sadd(key, elem.mid)
            }
            jedis.close()
            //保存到ES
            MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_DAU, startupList)
          }
    }
    streamingContext.start()
    streamingContext.awaitTermination()

  }

}

ES

技术图片

     综上 ,在实际环境中,需要一种能够容纳较大规模数据切交互性好的数据库。mysql虽然交互性好,但是容量扩展性有限。

   hbase虽然能够支持海量数据,但是查询的灵活度不足。所以ES在容量及交互性上达到一个非常不错的平衡,而且还能支持全文检索。

搭建es集群 https://www.cnblogs.com/shengyang17/p/10597841.html

ES& kibana的启动脚本: ./ek.sh start 

技术图片
[[email protected] gmall]$ cat ek.sh    
#!/bin/bash
es_home=/opt/module/elasticsearch
kibana_home=/opt/module/kibana/
case $1 in
"start"){
        echo "=============启动ES集群============="
        for i in hadoop101 hadoop102 hadoop103
        do
            ssh $i "source /etc/profile;${es_home}/bin/elasticsearch >/dev/null 2>&1 &"
        done
        echo "=============启动kibana============="
        nohup ${kibana_home}/bin/kibana >/opt/module/kibana/kibana.log 2>&1 &
};;
"stop"){
        echo "=============关闭kibana============="
        ps -ef | grep ${kibana_home} | grep -v grep | awk {print $2}|xargs kill
        echo "=============关闭ES集群============="
        for i in hadoop101 hadoop102 hadoop103
        do
            ssh $i "ps -ef | grep $es_home | grep -v grep | awk ‘{print \\$2}‘|xargs kill" >/dev/null 2>&1
        done

};;
esac
View Code

设计es索引结构

case class startup

技术图片
case class Startup(mid:String,
                   uid:String,
                   appid:String,
                   area:String,
                   os:String,
                   ch:String,
                   logType:String,
                   vs:String,
                   var logDate:String,
                   var logHour:String,
                   var logHourMinute:String,
                   var ts:Long
                  ) {

}
 
View Code

  text 支持分词; keyword 只能全部内容匹配
保存数据之前一定要先定义好mapping:  每个字段的类型 ; 分清楚索引类型

1、需要索引 也需要分词:标题,商品名称,分类名称,  type:“text”

2、需要索引,但不需要分词:类型id , 日期,数量 ,年龄 ,各种id, type:"keyword";

    mid, uid,area,os ,ch ,vs,logDate,logHourMinute,ts

3、既不需要索引,也不需要分词: 不被会用于条件过滤,经过脱敏的字段,138****0101  index:false

##############在ES中创建index
PUT gmall_dau
{
  "mappings": {
    "_doc":{
      "properties":{
         "mid":{
           "type":"keyword" ,
         },
         "uid":{
           "type":"keyword"
         },
         "area":{
           "type":"keyword"
         },
         "os":{
           "type":"keyword"
         },
         "ch":{
           "type":"keyword"
         },
         "vs":{
           "type":"keyword"
         },
         "logDate":{
           "type":"keyword"
         },
         "logHour":{
           "type":"keyword"
         },
         "logHourMinute":{
           "type":"keyword"
         },
         "ts":{
           "type":"long"
         } 
      }
    }
  }
}

 在Kibana中进行查询

   如果在在保存| 插入数据的时候,没有先建立mapping的数据结构,则ES是会自动推断;当你再去聚合aggs时,text的字段是不能进行聚合的(如果想要聚合要加 字段.keyword,如下所示),但是好一点的是ES给保存了两份,一个是text类型的字段、另外一个是keyword类型的;浪费了空间,在实际生产环境中是不能使用这种方式的;

GET /gmall_dau/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "logDate": "2019-05-04"
        }
      }
    }
  }
}

######groupby操作  聚合aggregation
GET /gmall_dau/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "logDate": "2019-04-30"
        }
      }
    }
  },
  "aggs": {
    "groupby_logHour": {
      "terms": {
        "field": "logHour.keyword",
        "size": 24
      }
    }
  }
}

保存到es中; 关于es java客户端的选择,目前市面上有两类客户端:

  一类是TransportClient 为代表的ES原生客户端,不能执行原生dsl语句必须使用它的Java api方法。
  另外一种是以Rest Api为主的missing client,最典型的就是jest。 这种客户端可以直接使用dsl语句拼成的字符串,直接传给服务端,然后返回json字符串再解析。
两种方式各有优劣,但是最近elasticsearch官网,宣布计划在7.0以后的版本中废除TransportClient。以RestClient为主。

所以在官方的RestClient 基础上,进行了简单包装的Jest客户端,就成了首选,而且该客户端也与springboot完美集成。

数据发布接口

详细见代码

通过gmall-mock模块的类JsonMocker发送数据--->nginx路由--->三台虚拟机的gmall-logger的接收数据并转发给kafka(用的是SpringBoot)--->

 启动:gmall-publisher--springBoot的主类: com.atguigu.gmall.publisher.GmallPublisherApplication,给chart的接口,启动

 启动:gmall--dw-chart---com.demo.DemoApplication的主类; 接接口展示数据的动态变化

 启动:[[email protected] ~]$ redis-server myredis/redis.conf 

启动:gmall-realtime的com.atguigu.gmall.realtime.app.DauApp类,

启动:gmall-mock模块的类JsonMocker发送数据

http://127.0.0.1:8070/realtime-total?date=2019-04-30
[{"name":"新增日活","id":"dau","value":761},{"name":"新增设备","id":"new_mid","value":233}]

http://127.0.0.1:8070/realtime-hour?id=dau&&date=2019-05-04
{"yesterday":{},"today":{"20":26,"21":96}}

通过前端页面展示: http://localhost:8089/index

 

以上是关于实时--1.1 日志的主要内容,如果未能解决你的问题,请参考以下文章

Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优

性能有瓶颈?3 个实时可视化 Python 工具包来帮你!

argparse 代码片段只打印部分日志

常用python日期日志获取内容循环的代码片段

美团高性能终端实时日志系统建设实践

开源实时日志分析ELK平台部署